diff --git a/pom.xml b/pom.xml
index b264895d60..22136d13b8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1210,16 +1210,6 @@
1.2.8
runtime
-
- com.google.apis
- google-api-services-storage
- v1-rev20220604-1.32.1
-
-
- com.google.cloud
- google-cloud-storage
- 2.8.0
-
diff --git a/src/e2e-test/features/bigquery/sink/BigQueryToBigQueryAdditional.feature b/src/e2e-test/features/bigquery/sink/BigQueryToBigQueryAdditional.feature
new file mode 100644
index 0000000000..1fdade3e9f
--- /dev/null
+++ b/src/e2e-test/features/bigquery/sink/BigQueryToBigQueryAdditional.feature
@@ -0,0 +1,454 @@
+# Copyright © 2023 Cask Data, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+@BigQuery_Sink
+Feature: BigQuery sink - Verification of BigQuery to BigQuery successful data transfer
+
+ @BQ_UPSERT_SOURCE_TEST @BQ_UPSERT_SINK_TEST @EXISTING_BQ_CONNECTION
+ Scenario:Validate successful records transfer from BigQuery source to BigQuery sink with Upsert operation by updating destination table schema and destination table exists with records in it.
+ Given Open Datafusion Project to configure pipeline
+ When Expand Plugin group in the LHS plugins list: "Source"
+ When Select plugin: "BigQuery" from the plugins list as: "Source"
+ When Expand Plugin group in the LHS plugins list: "Sink"
+ When Select plugin: "BigQuery" from the plugins list as: "Sink"
+ Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection
+ Then Navigate to the properties page of plugin: "BigQuery"
+ Then Click plugin property: "switch-useConnection"
+ Then Click on the Browse Connections button
+ Then Select connection: "bqConnectionName"
+ Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
+ And Replace input plugin property: "dataset" with value: "dataset"
+ And Replace input plugin property: "table" with value: "bqSourceTable"
+ Then Click on the Get Schema button
+ Then Validate "BigQuery" plugin properties
+ And Close the Plugin Properties page
+ Then Navigate to the properties page of plugin: "BigQuery2"
+ Then Click plugin property: "useConnection"
+ Then Click on the Browse Connections button
+ Then Select connection: "bqConnectionName"
+ Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName"
+ Then Enter input plugin property: "dataset" with value: "dataset"
+ Then Enter input plugin property: "table" with value: "bqTargetTable"
+ And Select radio button plugin property: "operation" with value: "upsert"
+ Then Click on the Add Button of the property: "relationTableKey" with value:
+ | TableKeyUpsert |
+ Then Click plugin property: "updateTableSchema"
+ Then Validate "BigQuery" plugin properties
+ And Close the Plugin Properties page
+ Then Save the pipeline
+ Then Preview and run the pipeline
+ Then Wait till pipeline preview is in running state
+ Then Open and capture pipeline preview logs
+ Then Verify the preview run status of pipeline in the logs is "succeeded"
+ Then Close the pipeline logs
+ Then Close the preview
+ Then Deploy the pipeline
+ Then Run the Pipeline in Runtime
+ Then Wait till pipeline is in running state
+ Then Open and capture logs
+ Then Close the pipeline logs
+ Then Verify the pipeline status is "Succeeded"
+ Then Validate the data transferred from BigQuery to BigQuery with actual And expected file for: "bqUpsertExpectedFile"
+
+ @BQ_NULL_MODE_SOURCE_TEST @BQ_SINK_TEST @EXISTING_BQ_CONNECTION
+ Scenario: Validate Successful record transfer from BigQuery source plugin to BigQuery sink plugin having all null values in one column and few null values in another column in Source table
+ Given Open Datafusion Project to configure pipeline
+ When Expand Plugin group in the LHS plugins list: "Source"
+ When Select plugin: "BigQuery" from the plugins list as: "Source"
+ When Expand Plugin group in the LHS plugins list: "Sink"
+ When Select plugin: "BigQuery" from the plugins list as: "Sink"
+ Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection
+ Then Navigate to the properties page of plugin: "BigQuery"
+ Then Click plugin property: "switch-useConnection"
+ Then Click on the Browse Connections button
+ Then Select connection: "bqConnectionName"
+ Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
+ And Replace input plugin property: "dataset" with value: "dataset"
+ And Replace input plugin property: "table" with value: "bqSourceTable"
+ Then Click on the Get Schema button
+ Then Validate "BigQuery" plugin properties
+ And Close the Plugin Properties page
+ Then Navigate to the properties page of plugin: "BigQuery2"
+ Then Click plugin property: "useConnection"
+ Then Click on the Browse Connections button
+ Then Select connection: "bqConnectionName"
+ Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName"
+ Then Enter input plugin property: "dataset" with value: "dataset"
+ Then Enter input plugin property: "table" with value: "bqTargetTable"
+ Then Validate "BigQuery" plugin properties
+ Then Close the BigQuery properties
+ Then Save the pipeline
+ Then Preview and run the pipeline
+ Then Wait till pipeline preview is in running state
+ Then Open and capture pipeline preview logs
+ Then Verify the preview run status of pipeline in the logs is "succeeded"
+ Then Close the pipeline logs
+ Then Close the preview
+ Then Deploy the pipeline
+ Then Run the Pipeline in Runtime
+ Then Wait till pipeline is in running state
+ Then Open and capture logs
+ Then Close the pipeline logs
+ Then Verify the pipeline status is "Succeeded"
+ Then Validate the values of records transferred to BQ sink is equal to the values from source BigQuery table
+
+ @BQ_UPDATE_SOURCE_DEDUPE_TEST @BQ_UPDATE_SINK_DEDUPE_TEST @EXISTING_BQ_CONNECTION
+ Scenario: Verify successful record transfer from BigQuery source to BigQuery sink using advance operation update with Dedupe By Property.
+ Given Open Datafusion Project to configure pipeline
+ When Expand Plugin group in the LHS plugins list: "Source"
+ When Select plugin: "BigQuery" from the plugins list as: "Source"
+ When Expand Plugin group in the LHS plugins list: "Sink"
+ When Select plugin: "BigQuery" from the plugins list as: "Sink"
+ Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection
+ Then Navigate to the properties page of plugin: "BigQuery"
+ Then Click plugin property: "switch-useConnection"
+ Then Click on the Browse Connections button
+ Then Select connection: "bqConnectionName"
+ Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
+ And Replace input plugin property: "dataset" with value: "dataset"
+ And Replace input plugin property: "table" with value: "bqSourceTable"
+ Then Click on the Get Schema button
+ Then Validate "BigQuery" plugin properties
+ And Close the Plugin Properties page
+ Then Navigate to the properties page of plugin: "BigQuery2"
+ Then Click plugin property: "useConnection"
+ Then Click on the Browse Connections button
+ Then Select connection: "bqConnectionName"
+ Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName"
+ Then Enter input plugin property: "dataset" with value: "dataset"
+ Then Enter input plugin property: "table" with value: "bqTargetTable"
+ And Select radio button plugin property: "operation" with value: "update"
+ Then Enter Value for plugin property table key : "relationTableKey" with values: "relationTableKeyValue"
+ Then Select dropdown plugin property: "dedupeBy" with option value: "dedupeByOrder"
+ Then Enter key for plugin property: "dedupeBy" with values: "dedupeByValue"
+ Then Validate "BigQuery" plugin properties
+ Then Close the BigQuery properties
+ Then Save the pipeline
+ Then Preview and run the pipeline
+ Then Wait till pipeline preview is in running state
+ Then Open and capture pipeline preview logs
+ Then Verify the preview run status of pipeline in the logs is "succeeded"
+ Then Close the pipeline logs
+ Then Close the preview
+ Then Deploy the pipeline
+ Then Run the Pipeline in Runtime
+ Then Wait till pipeline is in running state
+ Then Open and capture logs
+ Then Close the pipeline logs
+ Then Verify the pipeline status is "Succeeded"
+ Then Validate the data transferred from BigQuery to BigQuery with actual And expected file for: "bqUpdateDedupeExpectedFile"
+
+ @BQ_INSERT_INT_SOURCE_TEST @BQ_EXISTING_SINK_TEST @EXISTING_BQ_CONNECTION
+ Scenario: Verify successful record transfer for the Insert operation with partition type Integer and destination table is existing already.
+ Given Open Datafusion Project to configure pipeline
+ When Expand Plugin group in the LHS plugins list: "Source"
+ When Select plugin: "BigQuery" from the plugins list as: "Source"
+ When Expand Plugin group in the LHS plugins list: "Sink"
+ When Select plugin: "BigQuery" from the plugins list as: "Sink"
+ Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection
+ Then Navigate to the properties page of plugin: "BigQuery"
+ Then Click plugin property: "switch-useConnection"
+ Then Click on the Browse Connections button
+ Then Select connection: "bqConnectionName"
+ Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
+ And Replace input plugin property: "dataset" with value: "dataset"
+ And Replace input plugin property: "table" with value: "bqSourceTable"
+ Then Click on the Get Schema button
+ Then Validate "BigQuery" plugin properties
+ And Close the Plugin Properties page
+ Then Navigate to the properties page of plugin: "BigQuery2"
+ Then Click plugin property: "switch-useConnection"
+ Then Click on the Browse Connections button
+ Then Select connection: "bqConnectionName"
+ Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName"
+ Then Enter input plugin property: "dataset" with value: "dataset"
+ Then Enter input plugin property: "table" with value: "bqTargetTable"
+ Then Select BigQuery sink property partitioning type as "INTEGER"
+ Then Enter input plugin property: "rangeStart" with value: "rangeStartValue"
+ Then Enter input plugin property: "rangeEnd" with value: "rangeEndValue"
+ Then Enter input plugin property: "rangeInterval" with value: "rangeIntervalValue"
+ Then Enter input plugin property: "partitionByField" with value: "partitionByFieldValue"
+ Then Validate "BigQuery" plugin properties
+ Then Close the BigQuery properties
+ Then Save the pipeline
+ Then Preview and run the pipeline
+ Then Wait till pipeline preview is in running state
+ Then Open and capture pipeline preview logs
+ Then Verify the preview run status of pipeline in the logs is "succeeded"
+ Then Close the pipeline logs
+ Then Close the preview
+ Then Deploy the pipeline
+ Then Run the Pipeline in Runtime
+ Then Wait till pipeline is in running state
+ Then Open and capture logs
+ Then Close the pipeline logs
+ Then Verify the pipeline status is "Succeeded"
+ Then Validate the data transferred from BigQuery to BigQuery with actual And expected file for: "bqInsertExpectedFile"
+
+ @BQ_TIME_SOURCE_TEST @BQ_SINK_TEST @EXISTING_BQ_CONNECTION
+ Scenario: Verify successful record transfer for the Insert operation from BigQuery source plugin to BigQuery sink with partition type Time and partition field is date.
+ Given Open Datafusion Project to configure pipeline
+ When Expand Plugin group in the LHS plugins list: "Source"
+ When Select plugin: "BigQuery" from the plugins list as: "Source"
+ When Expand Plugin group in the LHS plugins list: "Sink"
+ When Select plugin: "BigQuery" from the plugins list as: "Sink"
+ Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection
+ Then Navigate to the properties page of plugin: "BigQuery"
+ Then Click plugin property: "switch-useConnection"
+ Then Click on the Browse Connections button
+ Then Select connection: "bqConnectionName"
+ Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
+ And Replace input plugin property: "dataset" with value: "dataset"
+ And Replace input plugin property: "table" with value: "bqSourceTable"
+ Then Click on the Get Schema button
+ Then Validate "BigQuery" plugin properties
+ And Close the Plugin Properties page
+ Then Navigate to the properties page of plugin: "BigQuery2"
+ Then Click plugin property: "useConnection"
+ Then Click on the Browse Connections button
+ Then Select connection: "bqConnectionName"
+ Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName"
+ Then Enter input plugin property: "dataset" with value: "dataset"
+ Then Enter input plugin property: "table" with value: "bqTargetTable"
+ Then Enter input plugin property: "partitionByField" with value: "bqPartitionFieldDate"
+ Then Click plugin property: "updateTableSchema"
+ Then Validate "BigQuery" plugin properties
+ Then Close the BigQuery properties
+ Then Save the pipeline
+ Then Preview and run the pipeline
+ Then Wait till pipeline preview is in running state
+ Then Open and capture pipeline preview logs
+ Then Verify the preview run status of pipeline in the logs is "succeeded"
+ Then Close the pipeline logs
+ Then Close the preview
+ Then Deploy the pipeline
+ Then Run the Pipeline in Runtime
+ Then Wait till pipeline is in running state
+ Then Open and capture logs
+ Then Close the pipeline logs
+ Then Verify the pipeline status is "Succeeded"
+ Then Validate the data transferred from BigQuery to BigQuery with actual And expected file for: "bqDateExpectedFile"
+
+ @BQ_TIME_SOURCE_TEST @BQ_SINK_TEST @EXISTING_BQ_CONNECTION
+ Scenario: Verify successful record transfer for the Insert operation from BigQuery source plugin to BigQuery sink with partition type Time and partition field is datetime.
+ Given Open Datafusion Project to configure pipeline
+ When Expand Plugin group in the LHS plugins list: "Source"
+ When Select plugin: "BigQuery" from the plugins list as: "Source"
+ When Expand Plugin group in the LHS plugins list: "Sink"
+ When Select plugin: "BigQuery" from the plugins list as: "Sink"
+ Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection
+ Then Navigate to the properties page of plugin: "BigQuery"
+ Then Click plugin property: "switch-useConnection"
+ Then Click on the Browse Connections button
+ Then Select connection: "bqConnectionName"
+ Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
+ And Replace input plugin property: "dataset" with value: "dataset"
+ And Replace input plugin property: "table" with value: "bqSourceTable"
+ Then Click on the Get Schema button
+ Then Validate "BigQuery" plugin properties
+ And Close the Plugin Properties page
+ Then Navigate to the properties page of plugin: "BigQuery2"
+ Then Click plugin property: "useConnection"
+ Then Click on the Browse Connections button
+ Then Select connection: "bqConnectionName"
+ Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName"
+ Then Enter input plugin property: "dataset" with value: "dataset"
+ Then Enter input plugin property: "table" with value: "bqTargetTable"
+ Then Enter input plugin property: "partitionByField" with value: "bqPartitionFieldDateTime"
+ Then Click plugin property: "updateTableSchema"
+ Then Validate "BigQuery" plugin properties
+ Then Close the BigQuery properties
+ Then Save the pipeline
+ Then Preview and run the pipeline
+ Then Wait till pipeline preview is in running state
+ Then Open and capture pipeline preview logs
+ Then Verify the preview run status of pipeline in the logs is "succeeded"
+ Then Close the pipeline logs
+ Then Close the preview
+ Then Deploy the pipeline
+ Then Run the Pipeline in Runtime
+ Then Wait till pipeline is in running state
+ Then Open and capture logs
+ Then Close the pipeline logs
+ Then Verify the pipeline status is "Succeeded"
+ Then Validate the data transferred from BigQuery to BigQuery with actual And expected file for: "bqDateTimeExpectedFile"
+
+ @BQ_TIME_SOURCE_TEST @BQ_SINK_TEST @EXISTING_BQ_CONNECTION
+ Scenario: Verify successful record transfer for the Insert operation from BigQuery source plugin to BigQuery sink with partition type Time and partition field is timestamp.
+ Given Open Datafusion Project to configure pipeline
+ When Expand Plugin group in the LHS plugins list: "Source"
+ When Select plugin: "BigQuery" from the plugins list as: "Source"
+ When Expand Plugin group in the LHS plugins list: "Sink"
+ When Select plugin: "BigQuery" from the plugins list as: "Sink"
+ Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection
+ Then Navigate to the properties page of plugin: "BigQuery"
+ Then Click plugin property: "switch-useConnection"
+ Then Click on the Browse Connections button
+ Then Select connection: "bqConnectionName"
+ Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
+ And Replace input plugin property: "dataset" with value: "dataset"
+ And Replace input plugin property: "table" with value: "bqSourceTable"
+ Then Click on the Get Schema button
+ Then Validate "BigQuery" plugin properties
+ And Close the Plugin Properties page
+ Then Navigate to the properties page of plugin: "BigQuery2"
+ Then Click plugin property: "useConnection"
+ Then Click on the Browse Connections button
+ Then Select connection: "bqConnectionName"
+ Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName"
+ Then Enter input plugin property: "dataset" with value: "dataset"
+ Then Enter input plugin property: "table" with value: "bqTargetTable"
+ Then Enter input plugin property: "partitionByField" with value: "bqPartitionFieldTimeStamp"
+ Then Click plugin property: "updateTableSchema"
+ Then Validate "BigQuery" plugin properties
+ Then Close the BigQuery properties
+ Then Save the pipeline
+ Then Preview and run the pipeline
+ Then Wait till pipeline preview is in running state
+ Then Open and capture pipeline preview logs
+ Then Verify the preview run status of pipeline in the logs is "succeeded"
+ Then Close the pipeline logs
+ Then Close the preview
+ Then Deploy the pipeline
+ Then Run the Pipeline in Runtime
+ Then Wait till pipeline is in running state
+ Then Open and capture logs
+ Then Close the pipeline logs
+ Then Verify the pipeline status is "Succeeded"
+ Then Validate the data transferred from BigQuery to BigQuery with actual And expected file for: "bqTimeStampExpectedFile"
+
+ @BQ_UPSERT_DEDUPE_SOURCE_TEST @BQ_UPSERT_DEDUPE_SINK_TEST @EXISTING_BQ_CONNECTION
+ Scenario:Validate successful records transfer from BigQuery source to BigQuery sink with Upsert operation with dedupe source data and existing destination table where update table schema is set to false
+ Given Open Datafusion Project to configure pipeline
+ When Expand Plugin group in the LHS plugins list: "Source"
+ When Select plugin: "BigQuery" from the plugins list as: "Source"
+ When Expand Plugin group in the LHS plugins list: "Sink"
+ When Select plugin: "BigQuery" from the plugins list as: "Sink"
+ Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection
+ Then Navigate to the properties page of plugin: "BigQuery"
+ Then Click plugin property: "switch-useConnection"
+ Then Click on the Browse Connections button
+ Then Select connection: "bqConnectionName"
+ Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
+ And Replace input plugin property: "dataset" with value: "dataset"
+ And Replace input plugin property: "table" with value: "bqSourceTable"
+ Then Click on the Get Schema button
+ Then Validate "BigQuery" plugin properties
+ And Close the Plugin Properties page
+ Then Navigate to the properties page of plugin: "BigQuery2"
+ Then Click plugin property: "useConnection"
+ Then Click on the Browse Connections button
+ Then Select connection: "bqConnectionName"
+ Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName"
+ Then Enter input plugin property: "dataset" with value: "dataset"
+ Then Enter input plugin property: "table" with value: "bqTargetTable"
+ And Select radio button plugin property: "operation" with value: "upsert"
+ Then Click on the Add Button of the property: "relationTableKey" with value:
+ | TableKeyDedupe |
+ Then Select dropdown plugin property: "dedupeBy" with option value: "dedupeBy"
+ Then Enter key for plugin property: "dedupeBy" with values: "dedupeByValueUpsert"
+ Then Validate "BigQuery" plugin properties
+ And Close the Plugin Properties page
+ Then Save the pipeline
+ Then Preview and run the pipeline
+ Then Wait till pipeline preview is in running state
+ Then Open and capture pipeline preview logs
+ Then Verify the preview run status of pipeline in the logs is "succeeded"
+ Then Close the pipeline logs
+ Then Close the preview
+ Then Deploy the pipeline
+ Then Run the Pipeline in Runtime
+ Then Wait till pipeline is in running state
+ Then Open and capture logs
+ Then Close the pipeline logs
+ Then Verify the pipeline status is "Succeeded"
+ Then Validate the data transferred from BigQuery to BigQuery with actual And expected file for: "bqUpsertDedupeFile"
+
+ @BQ_RECORD_SOURCE_TEST @BQ_SECOND_RECORD_SOURCE_TEST @BQ_SINK_TEST
+ Scenario: Validate successful record transfer from two BigQuery source plugins with different schema record names, taking one extra column in BigQuery source plugin 1,and
+ using wrangler transformation plugin for removing the extra column and transferring the data in BigQuery sink plugin containing all the columns from both the source plugin.
+ Given Open Datafusion Project to configure pipeline
+ Then Click on the Plus Green Button to import the pipelines
+ Then Select the file for importing the pipeline for the plugin "Directive_Drop"
+ Then Navigate to the properties page of plugin: "BigQuery"
+ Then Replace input plugin property: "project" with value: "projectId"
+ Then Replace input plugin property: "dataset" with value: "dataset"
+ Then Replace input plugin property: "table" with value: "bqSourceTable"
+ Then Click on the Get Schema button
+ Then Click on the Validate button
+ Then Close the Plugin Properties page
+ Then Navigate to the properties page of plugin: "BigQuery2"
+ Then Replace input plugin property: "project" with value: "projectId"
+ Then Replace input plugin property: "dataset" with value: "dataset"
+ Then Replace input plugin property: "table" with value: "bqSourceTable2"
+ Then Click on the Get Schema button
+ Then Click on the Validate button
+ Then Close the Plugin Properties page
+ Then Navigate to the properties page of plugin: "BigQuery3"
+ Then Replace input plugin property: "project" with value: "projectId"
+ Then Replace input plugin property: "table" with value: "bqTargetTable"
+ Then Replace input plugin property: "dataset" with value: "dataset"
+ Then Click on the Validate button
+ Then Close the Plugin Properties page
+ Then Rename the pipeline
+ Then Deploy the pipeline
+ Then Run the Pipeline in Runtime
+ Then Wait till pipeline is in running state
+ Then Open and capture logs
+ Then Verify the pipeline status is "Succeeded"
+ Then Close the pipeline logs
+ Then Validate the data transferred from BigQuery to BigQuery with actual And expected file for: "bqDifferentRecordFile"
+
+ @BQ_INSERT_INT_SOURCE_TEST @BQ_INSERT_SINK_TEST @CDAP-20830
+ Scenario:Validate successful records transfer from BigQuery to BigQuery with Advanced operations Insert with table existing in both source and sink plugin and update table schema to true.
+ Given Open Datafusion Project to configure pipeline
+ When Expand Plugin group in the LHS plugins list: "Source"
+ When Select plugin: "BigQuery" from the plugins list as: "Source"
+ When Expand Plugin group in the LHS plugins list: "Sink"
+ When Select plugin: "BigQuery" from the plugins list as: "Sink"
+ Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection
+ Then Navigate to the properties page of plugin: "BigQuery"
+ And Replace input plugin property: "project" with value: "projectId"
+ Then Override Service account details if set in environment variables
+ And Replace input plugin property: "datasetProject" with value: "datasetprojectId"
+ And Replace input plugin property: "referenceName" with value: "reference"
+ And Replace input plugin property: "dataset" with value: "dataset"
+ And Replace input plugin property: "table" with value: "bqSourceTable"
+ Then Click on the Get Schema button
+ Then Validate "BigQuery" plugin properties
+ And Close the Plugin Properties page
+ Then Navigate to the properties page of plugin: "BigQuery2"
+ Then Replace input plugin property: "project" with value: "projectId"
+ Then Override Service account details if set in environment variables
+ Then Enter input plugin property: "datasetProject" with value: "projectId"
+ Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
+ Then Enter input plugin property: "dataset" with value: "dataset"
+ Then Enter input plugin property: "table" with value: "bqTargetTable"
+ And Select radio button plugin property: "operation" with value: "insert"
+ Then Click plugin property: "updateTableSchema"
+ Then Validate "BigQuery" plugin properties
+ And Close the Plugin Properties page
+ Then Save the pipeline
+ Then Preview and run the pipeline
+ Then Wait till pipeline preview is in running state
+ Then Open and capture pipeline preview logs
+ Then Verify the preview run status of pipeline in the logs is "succeeded"
+ Then Close the pipeline logs
+ Then Close the preview
+ Then Deploy the pipeline
+ Then Run the Pipeline in Runtime
+ Then Wait till pipeline is in running state
+ Then Open and capture logs
+ Then Verify the pipeline status is "Succeeded"
+ Then Close the pipeline logs
diff --git a/src/e2e-test/java/io/cdap/plugin/bigquery/runners/sinkrunner/TestRunner.java b/src/e2e-test/java/io/cdap/plugin/bigquery/runners/sinkrunner/TestRunner.java
index 2c07f9c50c..b6085ccb1e 100644
--- a/src/e2e-test/java/io/cdap/plugin/bigquery/runners/sinkrunner/TestRunner.java
+++ b/src/e2e-test/java/io/cdap/plugin/bigquery/runners/sinkrunner/TestRunner.java
@@ -27,7 +27,8 @@
features = {"src/e2e-test/features"},
glue = {"io.cdap.plugin.bigquery.stepsdesign", "io.cdap.plugin.gcs.stepsdesign",
"stepsdesign", "io.cdap.plugin.common.stepsdesign"},
- tags = {"@BigQuery_Sink"},
+ tags = {"@BigQuery_Sink and not @CDAP-20830"},
+ //TODO: Enable test once issue is fixed https://cdap.atlassian.net/browse/CDAP-20830
monochrome = true,
plugin = {"pretty", "html:target/cucumber-html-report/bigquery-sink",
"json:target/cucumber-reports/cucumber-bigquery-sink.json",
diff --git a/src/e2e-test/java/io/cdap/plugin/bigquery/runners/sinkrunner/TestRunnerRequired.java b/src/e2e-test/java/io/cdap/plugin/bigquery/runners/sinkrunner/TestRunnerRequired.java
index dea80dd01e..1347921079 100644
--- a/src/e2e-test/java/io/cdap/plugin/bigquery/runners/sinkrunner/TestRunnerRequired.java
+++ b/src/e2e-test/java/io/cdap/plugin/bigquery/runners/sinkrunner/TestRunnerRequired.java
@@ -29,6 +29,7 @@
"stepsdesign", "io.cdap.plugin.common.stepsdesign"},
tags = {"@BigQuery_Sink_Required"},
monochrome = true,
+ //TODO: Enable test once issue is fixed https://cdap.atlassian.net/browse/CDAP-20830
plugin = {"pretty", "html:target/cucumber-html-report/bigquery-sink-required",
"json:target/cucumber-reports/cucumber-bigquery-sink-required.json",
"junit:target/cucumber-reports/cucumber-bigquery-sink-required.xml"}
diff --git a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java
index 7e18831e0c..3fc9a35a94 100644
--- a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java
+++ b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java
@@ -59,6 +59,7 @@ public class TestSetupHooks {
public static String gcsTargetBucketName = StringUtils.EMPTY;
public static String bqTargetTable = StringUtils.EMPTY;
public static String bqSourceTable = StringUtils.EMPTY;
+ public static String bqSourceTable2 = StringUtils.EMPTY;
public static String bqSourceView = StringUtils.EMPTY;
public static String pubSubTargetTopic = StringUtils.EMPTY;
public static String spannerInstance = StringUtils.EMPTY;
@@ -211,7 +212,7 @@ public static void createTargetGCSBucketWithCSVFile() throws IOException, URISyn
BeforeActions.scenario.write("GCS target bucket name - " + gcsTargetBucketName);
}
- @After(order = 1, value = "@GCS_SINK_TEST or @GCS_SINK_EXISTING_BUCKET_TEST or @GCS_SINK_MULTI_PART_UPLOAD")
+ @After(order = 1, value = "@GCS_SINK_TEST or @GCS_SINK_EXISTING_BUCKET_TEST")
public static void deleteTargetBucketWithFile() {
deleteGCSBucket(gcsTargetBucketName);
PluginPropertyUtils.removePluginProp("gcsTargetBucketName");
@@ -1030,6 +1031,261 @@ public static void createSinkBQExistingDatatypeTable() throws IOException, Inter
PluginPropertyUtils.addPluginProp(" bqTargetTable", bqTargetTable);
BeforeActions.scenario.write("BQ Target Table " + bqTargetTable + " updated successfully");
}
+
+ @Before(order = 1, value = "@BQ_UPSERT_SOURCE_TEST")
+ public static void createSourceBQUpsertTable() throws IOException, InterruptedException {
+ bqSourceTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_");
+ io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " +
+ "(ID INT64, Name STRING, " + "Price FLOAT64," +
+ "Customer_Exists BOOL ) ");
+ try {
+ io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable + "` " +
+ "(ID, Name, Price, Customer_Exists)" +
+ "VALUES" + "(5, 'Raja', 500.0, true)," +
+ "(6, 'Tom', 100.0, false)");
+ } catch (NoSuchElementException e) {
+ // Insert query does not return any record.
+ // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException
+ BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace());
+ }
+ PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable);
+ BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " created successfully");
+ }
+
+ @Before(order = 1, value = "@BQ_UPSERT_SINK_TEST")
+ public static void createSinkBQUpsertTable() throws IOException, InterruptedException {
+ bqTargetTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_");
+ PluginPropertyUtils.addPluginProp("bqTargetTable", bqTargetTable);
+ BeforeActions.scenario.write("BQ target table name - " + bqTargetTable);
+ io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqTargetTable + "` " +
+ "(ID INT64, Name STRING, " + "Price FLOAT64," +
+ "Customer_Exists BOOL ) ");
+ try {
+ io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqTargetTable + "` " +
+ "(ID, Name, Price, Customer_Exists)" +
+ "VALUES" + "(5, 'Rakesh', 500.0, true)");
+ } catch (NoSuchElementException e) {
+ // Insert query does not return any record.
+ // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException
+ BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace());
+ }
+ PluginPropertyUtils.addPluginProp("bqTargetTable", bqTargetTable);
+ BeforeActions.scenario.write("BQ Target Table " + bqTargetTable + " created successfully");
+ }
+
+ @Before(value = "@BQ_NULL_MODE_SOURCE_TEST")
+ public static void createNullSourceBQTable() throws IOException, InterruptedException {
+ bqSourceTable = "E2E_TARGET_" + UUID.randomUUID().toString().replaceAll("-", "_");
+ PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable);
+ BeforeActions.scenario.write("BQ source table name - " + bqSourceTable);
+ io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " +
+ "(Address STRING, id INT64, Firstname STRING," +
+ "LastName STRING)");
+ try {
+ BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable + "` " +
+ "(Address, id, Firstname, LastName)" +
+ "VALUES" + "('Agra', 1, 'Harry','')," +
+ "('Noida', 2, '','')");
+ } catch (NoSuchElementException e) {
+ // Insert query does not return any record.
+ // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException
+ BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace());
+ }
+ PluginPropertyUtils.addPluginProp(" bqSourceTable", bqSourceTable);
+ BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " updated successfully");
+ }
+
+ @Before(value = "@BQ_UPDATE_SOURCE_DEDUPE_TEST")
+ public static void createSourceBQDedupeTable() throws IOException, InterruptedException {
+ bqSourceTable = "E2E_TARGET_" + UUID.randomUUID().toString().replaceAll("-", "_");
+ PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable);
+ BeforeActions.scenario.write("BQ source table name - " + bqSourceTable);
+ io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " +
+ "(ID INT64, Name STRING, Price FLOAT64, " +
+ "Customer_Exists BOOL)");
+ try {
+ io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable + "` " +
+ "(Name, ID, Price,Customer_Exists)" +
+ "VALUES" + "('string_1', 1, 0.1,true)," +
+ "('string_1', 2, 0.2,false)");
+ } catch (NoSuchElementException e) {
+ // Insert query does not return any record.
+ // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException
+ BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace());
+ }
+ PluginPropertyUtils.addPluginProp(" bqSourceTable", bqSourceTable);
+ BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " updated successfully");
+ }
+
+ @Before(value = "@BQ_UPDATE_SINK_DEDUPE_TEST")
+ public static void createSinkBQDedupeTable() throws IOException, InterruptedException {
+ bqTargetTable = "E2E_TARGET_" + UUID.randomUUID().toString().replaceAll("-", "_");
+ PluginPropertyUtils.addPluginProp("bqTargetTable", bqTargetTable);
+ BeforeActions.scenario.write("BQ target table name - " + bqTargetTable);
+ io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqTargetTable + "` " +
+ "(ID INT64, Name STRING, Price FLOAT64, " +
+ "Customer_Exists BOOL)");
+ try {
+ io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqTargetTable + "` " +
+ "(Name, ID, Price,Customer_Exists)" +
+ "VALUES" + "('string_0', 0, 0,true)," +
+ "('string_1', 10, 1.1,false)");
+ } catch (NoSuchElementException e) {
+ // Insert query does not return any record.
+ // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException
+ BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace());
+ }
+ PluginPropertyUtils.addPluginProp(" bqTargetTable", bqTargetTable);
+ BeforeActions.scenario.write("BQ Target Table " + bqTargetTable + " updated successfully");
+ }
+
+ @Before(value = "@BQ_INSERT_INT_SOURCE_TEST")
+ public static void createSourceBQTable() throws IOException, InterruptedException {
+ bqSourceTable = "E2E_TARGET_" + UUID.randomUUID().toString().replaceAll("-", "_");
+ PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable);
+ BeforeActions.scenario.write("BQ source table name - " + bqSourceTable);
+ BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " +
+ "(ID INT64, Name STRING, Price FLOAT64, Customer_Exists BOOL)");
+ try {
+ io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable + "` " +
+ "(ID, Name, Price,Customer_Exists)" +
+ "VALUES" + "(3, 'Rajan Kumar', 100.0, true)");
+ } catch (NoSuchElementException e) {
+ // Insert query does not return any record.
+ // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException
+ BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace());
+ }
+ PluginPropertyUtils.addPluginProp(" bqSourceTable", bqSourceTable);
+ BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " updated successfully");
+ }
+
+ @Before(order = 1, value = "@BQ_TIME_SOURCE_TEST")
+ public static void createTimeStampBQTable() throws IOException, InterruptedException {
+ bqSourceTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_");
+ PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable);
+ BeforeActions.scenario.write("BQ source table name - " + bqSourceTable);
+ BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " +
+ "(ID STRING, transaction_date DATE, Firstname STRING," +
+ " transaction_dt DATETIME, updated_on TIMESTAMP )");
+ try {
+ BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable + "` " +
+ "(ID, transaction_date, Firstname, transaction_dt, updated_on )" +
+ "VALUES" + "('Agra', '2021-02-20', 'Neera','2019-07-07 11:24:00', " +
+ "'2019-03-10 04:50:01 UTC')," +
+ "('Noida', '2021-02-21','', '2019-07-07 11:24:00', " +
+ "'2019-03-10 04:50:01 UTC')," +
+ "('Gurgaon', '2021-02-22', 'singh', '2019-07-07 11:24:00', " +
+ "'2019-03-10 04:50:01 UTC' )");
+ } catch (NoSuchElementException e) {
+ // Insert query does not return any record.
+ // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException
+ BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace());
+ }
+ PluginPropertyUtils.addPluginProp(" bqSourceTable", bqSourceTable);
+ BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " updated successfully");
+ }
+
+ @Before(order = 1, value = "@BQ_UPSERT_DEDUPE_SOURCE_TEST")
+ public static void createSourceBQDedupeUpsertTable() throws IOException, InterruptedException {
+ bqSourceTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_");
+ io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " +
+ "(ID INT64, Name STRING, " + "Price FLOAT64," +
+ "Customer_Exists BOOL ) ");
+ try {
+ io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable + "` " +
+ "(ID, Name, Price, Customer_Exists)" +
+ "VALUES" + "(1, 'string_1', 0.1, true)," +
+ "(2, 'string_1', 0.2, false)," +
+ "(3, 'string_3', 0.3, false)");
+ } catch (NoSuchElementException e) {
+ // Insert query does not return any record.
+ // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException
+ BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace());
+ }
+ PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable);
+ BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " created successfully");
+ }
+
+ @Before(order = 1, value = "@BQ_UPSERT_DEDUPE_SINK_TEST")
+ public static void createSinkBQDeupeUpsertTable() throws IOException, InterruptedException {
+ bqTargetTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_");
+ PluginPropertyUtils.addPluginProp("bqTargetTable", bqTargetTable);
+ BeforeActions.scenario.write("BQ target table name - " + bqTargetTable);
+ io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqTargetTable + "` " +
+ "(ID INT64, Name STRING, " + "Price FLOAT64," +
+ "Customer_Exists BOOL ) ");
+ try {
+ io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqTargetTable + "` " +
+ "(ID, Name, Price, Customer_Exists)" +
+ "VALUES" + "(0, 'string_0', 0, true)," +
+ "(10, 'string_1', 1.1, false)");
+ } catch (NoSuchElementException e) {
+ // Insert query does not return any record.
+ // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException
+ BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace());
+ }
+ PluginPropertyUtils.addPluginProp("bqTargetTable", bqTargetTable);
+ BeforeActions.scenario.write("BQ Target Table " + bqTargetTable + " created successfully");
+ }
+
+ @Before(order = 1, value = "@BQ_RECORD_SOURCE_TEST")
+ public static void createSourceBQRecordTable() throws IOException, InterruptedException {
+ bqSourceTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_");
+ io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " +
+ "(ID INT64, Name STRING, " + "Price FLOAT64," +
+ "TableName STRING ) ");
+ try {
+ io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable + "` " +
+ "(ID, Name, Price, TableName)" +
+ "VALUES" + "(1, 'string_1', 0.1, 'Test')");
+ } catch (NoSuchElementException e) {
+ // Insert query does not return any record.
+ // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException
+ BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace());
+ }
+ PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable);
+ BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " created successfully");
+ }
+
+ @Before(order = 1, value = "@BQ_SECOND_RECORD_SOURCE_TEST")
+ public static void createSourceBQSecondRecordTable() throws IOException, InterruptedException {
+ bqSourceTable2 = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_");
+ io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable2 + "` " +
+ "(ID INT64, Name STRING, " + "Price FLOAT64 ) ");
+ try {
+ io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable2 + "` " +
+ "(ID, Name, Price)" +
+ "VALUES" + "(1, 'string_1', 0.1)");
+ } catch (NoSuchElementException e) {
+ // Insert query does not return any record.
+ // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException
+ BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace());
+ }
+ PluginPropertyUtils.addPluginProp("bqSourceTable2", bqSourceTable2);
+ BeforeActions.scenario.write("BQ Source Table " + bqSourceTable2 + " created successfully");
+ }
+
+ @Before(order = 1, value = "@BQ_INSERT_SINK_TEST")
+ public static void createSinkBQInsertTable() throws IOException, InterruptedException {
+
+ bqTargetTable = "E2E_TARGET_" + UUID.randomUUID().toString().replaceAll("-", "_");
+ PluginPropertyUtils.addPluginProp("bqTargetTable", bqTargetTable);
+ BeforeActions.scenario.write("BQ Target table name - " + bqTargetTable);
+ io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqTargetTable + "` " +
+ "(ID INT64,Name STRING," +
+ "id_Value INT64, Customer_Exists BOOL ) ");
+ try {
+ io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqTargetTable + "` " +
+ "(ID, Name, id_Value, Customer_Exists)" +
+ "VALUES" + "(3, 'Rajan Kumar', 100, true)");
+ } catch (NoSuchElementException e) {
+ // Insert query does not return any record.
+ // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException
+ BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace());
+ }
+ PluginPropertyUtils.addPluginProp(" bqTargetTable", bqTargetTable);
+ BeforeActions.scenario.write("BQ Target Table " + bqTargetTable + " updated successfully");
+ }
private static String createGCSBucketLifeCycle() throws IOException, URISyntaxException {
String bucketName = StorageClient.createBucketwithLifeCycle("00000000-e2e-" + UUID.randomUUID(), 30).getName();
PluginPropertyUtils.addPluginProp("gcsTargetBucketName", bucketName);
diff --git a/src/e2e-test/resources/pluginParameters.properties b/src/e2e-test/resources/pluginParameters.properties
index 50bd04bb8b..ae191f7c90 100644
--- a/src/e2e-test/resources/pluginParameters.properties
+++ b/src/e2e-test/resources/pluginParameters.properties
@@ -207,6 +207,31 @@ bqDataTypeTestFileSchema1=[{"key":"Id","value":"long"},{"key":"Value","value":"l
{"key":"UID","value":"string"}]
bqExpectedFile=testdata/BigQuery/BQExistingTableFile
bgInsertDatatypeFile=testdata/BigQuery/BQInsertDatatypeFile
+TableKeyUpsert=ID
+TableKeyInsert=ID
+bqUpsertExpectedFile=testdata/BigQuery/BQUpsertTableFile
+bqUpdateDedupeExpectedFile=testdata/BigQuery/BQUpdateDedupeFile
+bqInsertExpectedFile=testdata/BigQuery/BQInsertIntFile
+relationTableKeyValue=Name
+dedupeByOrder=ASC
+dedupeByValue=ID
+dedupeByValueUpsert=Price
+rangeStartValue=2
+rangeEndValue=3
+rangeIntervalValue=1
+partitionByFieldValue=ID
+bqPartitionFieldDateTime=transaction_dt
+bqPartitionFieldTimeStamp=updated_on
+bqSourceTable2=dummy
+dedupeBy=DESC
+TableKeyDedupe=Name
+Directive_Drop=testdata/BigQuery/test_diffschema_record-cdap-data-pipeline.json
+bqUpsertDedupeFile=testdata/BigQuery/BQUpsertDedupeFile
+bqDifferentRecordFile=testdata/BigQuery/BQDifferentRecordNameFile
+bqDateExpectedFile=testdata/BigQuery/BQDateFile
+bqDateTimeExpectedFile=testdata/BigQuery/BQDateTimeFile
+bqTimeStampExpectedFile=testdata/BigQuery/BQTimeStampFile
+bqPartitionFieldDate=transaction_date
## BIGQUERY-PLUGIN-PROPERTIES-END
## PUBSUBSINK-PLUGIN-PROPERTIES-START
diff --git a/src/e2e-test/resources/testdata/BigQuery/BQDateFile b/src/e2e-test/resources/testdata/BigQuery/BQDateFile
new file mode 100644
index 0000000000..9f24705f5c
--- /dev/null
+++ b/src/e2e-test/resources/testdata/BigQuery/BQDateFile
@@ -0,0 +1,3 @@
+{"Firstname":"singh","ID":"Gurgaon","transaction_date":"2021-02-22","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"}
+{"Firstname":"Neera","ID":"Agra","transaction_date":"2021-02-20","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"}
+{"Firstname":"","ID":"Noida","transaction_date":"2021-02-21","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"}
\ No newline at end of file
diff --git a/src/e2e-test/resources/testdata/BigQuery/BQDateTimeFile b/src/e2e-test/resources/testdata/BigQuery/BQDateTimeFile
new file mode 100644
index 0000000000..9f24705f5c
--- /dev/null
+++ b/src/e2e-test/resources/testdata/BigQuery/BQDateTimeFile
@@ -0,0 +1,3 @@
+{"Firstname":"singh","ID":"Gurgaon","transaction_date":"2021-02-22","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"}
+{"Firstname":"Neera","ID":"Agra","transaction_date":"2021-02-20","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"}
+{"Firstname":"","ID":"Noida","transaction_date":"2021-02-21","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"}
\ No newline at end of file
diff --git a/src/e2e-test/resources/testdata/BigQuery/BQDifferentRecordNameFile b/src/e2e-test/resources/testdata/BigQuery/BQDifferentRecordNameFile
new file mode 100644
index 0000000000..18336cbbd0
--- /dev/null
+++ b/src/e2e-test/resources/testdata/BigQuery/BQDifferentRecordNameFile
@@ -0,0 +1,2 @@
+{"ID":1,"Name":"string_1","Price":0.1}
+{"ID":1,"Name":"string_1","Price":0.1}
\ No newline at end of file
diff --git a/src/e2e-test/resources/testdata/BigQuery/BQInsertIntFile b/src/e2e-test/resources/testdata/BigQuery/BQInsertIntFile
new file mode 100644
index 0000000000..aeed733779
--- /dev/null
+++ b/src/e2e-test/resources/testdata/BigQuery/BQInsertIntFile
@@ -0,0 +1,2 @@
+{"Customer_Exists":true,"ID":3,"Name":"Rajan Kumar","Price":100.0}
+{"Customer_Exists":true,"ID":3,"Name":"Rajan Kumar","Price":100.0}
\ No newline at end of file
diff --git a/src/e2e-test/resources/testdata/BigQuery/BQTimeStampFile b/src/e2e-test/resources/testdata/BigQuery/BQTimeStampFile
new file mode 100644
index 0000000000..9f24705f5c
--- /dev/null
+++ b/src/e2e-test/resources/testdata/BigQuery/BQTimeStampFile
@@ -0,0 +1,3 @@
+{"Firstname":"singh","ID":"Gurgaon","transaction_date":"2021-02-22","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"}
+{"Firstname":"Neera","ID":"Agra","transaction_date":"2021-02-20","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"}
+{"Firstname":"","ID":"Noida","transaction_date":"2021-02-21","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"}
\ No newline at end of file
diff --git a/src/e2e-test/resources/testdata/BigQuery/BQUpdateDedupeFile b/src/e2e-test/resources/testdata/BigQuery/BQUpdateDedupeFile
new file mode 100644
index 0000000000..6e2a839642
--- /dev/null
+++ b/src/e2e-test/resources/testdata/BigQuery/BQUpdateDedupeFile
@@ -0,0 +1,2 @@
+{"Customer_Exists":true,"ID":0,"Name":"string_0","Price":0.0}
+{"Customer_Exists":true,"ID":1,"Name":"string_1","Price":0.1}
\ No newline at end of file
diff --git a/src/e2e-test/resources/testdata/BigQuery/BQUpsertDedupeFile b/src/e2e-test/resources/testdata/BigQuery/BQUpsertDedupeFile
new file mode 100644
index 0000000000..550a80c916
--- /dev/null
+++ b/src/e2e-test/resources/testdata/BigQuery/BQUpsertDedupeFile
@@ -0,0 +1,3 @@
+{"Customer_Exists":false,"ID":3,"Name":"string_3","Price":0.3}
+{"Customer_Exists":false,"ID":2,"Name":"string_1","Price":0.2}
+{"Customer_Exists":true,"ID":0,"Name":"string_0","Price":0.0}
\ No newline at end of file
diff --git a/src/e2e-test/resources/testdata/BigQuery/BQUpsertTableFile b/src/e2e-test/resources/testdata/BigQuery/BQUpsertTableFile
new file mode 100644
index 0000000000..0b48d57e4d
--- /dev/null
+++ b/src/e2e-test/resources/testdata/BigQuery/BQUpsertTableFile
@@ -0,0 +1,2 @@
+{"Customer_Exists":true,"ID":5,"Name":"Raja","Price":500.0}
+{"Customer_Exists":false,"ID":6,"Name":"Tom","Price":100.0}
\ No newline at end of file
diff --git a/src/e2e-test/resources/testdata/BigQuery/test_diffschema_record-cdap-data-pipeline.json b/src/e2e-test/resources/testdata/BigQuery/test_diffschema_record-cdap-data-pipeline.json
new file mode 100644
index 0000000000..e1b1966d3f
--- /dev/null
+++ b/src/e2e-test/resources/testdata/BigQuery/test_diffschema_record-cdap-data-pipeline.json
@@ -0,0 +1,183 @@
+{
+ "name": "test_diffschema_record",
+ "description": "Data Pipeline Application",
+ "artifact": {
+ "name": "cdap-data-pipeline",
+ "version": "6.10.0-SNAPSHOT",
+ "scope": "SYSTEM"
+ },
+ "config": {
+ "resources": {
+ "memoryMB": 2048,
+ "virtualCores": 1
+ },
+ "driverResources": {
+ "memoryMB": 2048,
+ "virtualCores": 1
+ },
+ "connections": [
+ {
+ "from": "BigQuery",
+ "to": "Wrangler"
+ },
+ {
+ "from": "Wrangler",
+ "to": "BigQuery3"
+ },
+ {
+ "from": "BigQuery2",
+ "to": "BigQuery3"
+ }
+ ],
+ "comments": [],
+ "postActions": [],
+ "properties": {},
+ "processTimingEnabled": true,
+ "stageLoggingEnabled": false,
+ "stages": [
+ {
+ "name": "BigQuery",
+ "plugin": {
+ "name": "BigQueryTable",
+ "type": "batchsource",
+ "label": "BigQuery",
+ "artifact": {
+ "name": "google-cloud",
+ "version": "0.22.0-SNAPSHOT",
+ "scope": "SYSTEM"
+ },
+ "properties": {
+ "useConnection": "false",
+ "project": "cdf-athena",
+ "serviceAccountType": "filePath",
+ "serviceFilePath": "auto-detect",
+ "referenceName": "bq_ref",
+ "dataset": "bq_automation",
+ "table": "bqSourceTableMore",
+ "enableQueryingViews": "false",
+ "schema": "{\"type\":\"record\",\"name\":\"output\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]},{\"name\":\"TableName\",\"type\":[\"string\",\"null\"]}]}"
+ }
+ },
+ "outputSchema": "{\"type\":\"record\",\"name\":\"output\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]},{\"name\":\"TableName\",\"type\":[\"string\",\"null\"]}]}",
+ "id": "BigQuery",
+ "type": "batchsource",
+ "label": "BigQuery",
+ "icon": "fa-plug"
+ },
+ {
+ "name": "Wrangler",
+ "plugin": {
+ "name": "Wrangler",
+ "type": "transform",
+ "label": "Wrangler",
+ "artifact": {
+ "name": "wrangler-transform",
+ "version": "4.10.0-SNAPSHOT",
+ "scope": "SYSTEM"
+ },
+ "properties": {
+ "field": "*",
+ "precondition": "false",
+ "directives": "drop :TableName",
+ "on-error": "fail-pipeline",
+ "schema": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]}]}",
+ "workspaceId": "7038fc39-732e-4d75-8d3f-db6cfe5a11d8"
+ }
+ },
+ "outputSchema": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]}]}",
+ "inputSchema": [
+ {
+ "name": "BigQuery",
+ "schema": "{\"type\":\"record\",\"name\":\"output\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]},{\"name\":\"TableName\",\"type\":[\"string\",\"null\"]}]}"
+ }
+ ],
+ "id": "Wrangler",
+ "type": "transform",
+ "label": "Wrangler",
+ "icon": "icon-DataPreparation"
+ },
+ {
+ "name": "BigQuery3",
+ "plugin": {
+ "name": "BigQueryTable",
+ "type": "batchsink",
+ "label": "BigQuery3",
+ "artifact": {
+ "name": "google-cloud",
+ "version": "0.22.0-SNAPSHOT",
+ "scope": "SYSTEM"
+ },
+ "properties": {
+ "useConnection": "false",
+ "project": "auto-detect",
+ "serviceAccountType": "filePath",
+ "serviceFilePath": "auto-detect",
+ "dataset": "bq_automation",
+ "table": "New_target_table_combine",
+ "operation": "insert",
+ "truncateTable": "false",
+ "allowSchemaRelaxation": "false",
+ "location": "US",
+ "createPartitionedTable": "false",
+ "partitioningType": "TIME",
+ "partitionFilterRequired": "false",
+ "schema": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]}]}"
+ }
+ },
+ "outputSchema": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]}]}",
+ "inputSchema": [
+ {
+ "name": "Wrangler",
+ "schema": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]}]}"
+ },
+ {
+ "name": "BigQuery2",
+ "schema": "{\"type\":\"record\",\"name\":\"output\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]}]}"
+ }
+ ],
+ "id": "BigQuery3",
+ "type": "batchsink",
+ "label": "BigQuery3",
+ "icon": "fa-plug"
+ },
+ {
+ "name": "BigQuery2",
+ "plugin": {
+ "name": "BigQueryTable",
+ "type": "batchsource",
+ "label": "BigQuery2",
+ "artifact": {
+ "name": "google-cloud",
+ "version": "0.22.0-SNAPSHOT",
+ "scope": "SYSTEM"
+ },
+ "properties": {
+ "useConnection": "false",
+ "project": "cdf-athena",
+ "serviceAccountType": "filePath",
+ "serviceFilePath": "auto-detect",
+ "referenceName": "bq_test",
+ "dataset": "bq_automation",
+ "table": "bqSourceTableLess",
+ "enableQueryingViews": "false",
+ "schema": "{\"type\":\"record\",\"name\":\"output\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]}]}"
+ }
+ },
+ "outputSchema": "{\"type\":\"record\",\"name\":\"output\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]}]}",
+ "id": "BigQuery2",
+ "type": "batchsource",
+ "label": "BigQuery2",
+ "icon": "fa-plug"
+ }
+ ],
+ "schedule": "0 1 */1 * *",
+ "engine": "spark",
+ "numOfRecordsPreview": 100,
+ "rangeRecordsPreview": {
+ "min": 1,
+ "max": "5000"
+ },
+ "maxConcurrentRuns": 1
+ },
+ "version": "fe4ee1e3-6380-11ee-8217-0000003390c8"
+}
\ No newline at end of file