From 1b88f261e136bc056be05c81b93805b80a184cc1 Mon Sep 17 00:00:00 2001
From: AMit-Cloudsufi <amit.singh@cloudsufi.com>
Date: Fri, 17 Jan 2025 06:20:19 +0000
Subject: [PATCH] Error management for BigQuery

---
 .../action/BigQueryArgumentSetter.java        | 13 +++++--
 .../action/BigQueryArgumentSetterConfig.java  |  9 ++++-
 .../gcp/bigquery/action/BigQueryExecute.java  | 37 ++++++++++++++-----
 3 files changed, 44 insertions(+), 15 deletions(-)

diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryArgumentSetter.java b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryArgumentSetter.java
index f8faa92cb..8ced9c7c1 100644
--- a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryArgumentSetter.java
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryArgumentSetter.java
@@ -33,6 +33,9 @@
 import io.cdap.cdap.api.annotation.Description;
 import io.cdap.cdap.api.annotation.Name;
 import io.cdap.cdap.api.annotation.Plugin;
+import io.cdap.cdap.api.exception.ErrorCategory;
+import io.cdap.cdap.api.exception.ErrorType;
+import io.cdap.cdap.api.exception.ErrorUtils;
 import io.cdap.cdap.etl.api.action.Action;
 import io.cdap.cdap.etl.api.action.ActionContext;
 import io.cdap.plugin.gcp.common.GCPUtils;
@@ -91,13 +94,17 @@ public void run(ActionContext context) throws Exception {
 
     // Check for errors
     if (queryJob.getStatus().getError() != null) {
-      throw new RuntimeException(queryJob.getStatus().getExecutionErrors().toString());
+      String error = queryJob.getStatus().getExecutionErrors().toString();
+      throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
+        error, error, ErrorType.USER, false, null);
     }
 
     TableResult queryResults = queryJob.getQueryResults();
     if (queryResults.getTotalRows() == 0 || queryResults.getTotalRows() > 1) {
-      throw new RuntimeException(String.format("The query result total rows should be \"1\" but is \"%d\"",
-                                               queryResults.getTotalRows()));
+      String error = String.format("The query result total rows should be \"1\" but is \"%d\"",
+        queryResults.getTotalRows());
+      throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
+        error, error, ErrorType.USER, false, null);
     }
 
     Schema schema = queryResults.getSchema();
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryArgumentSetterConfig.java b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryArgumentSetterConfig.java
index d0ffb827e..cac893ec1 100644
--- a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryArgumentSetterConfig.java
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryArgumentSetterConfig.java
@@ -27,6 +27,9 @@
 import io.cdap.cdap.api.annotation.Description;
 import io.cdap.cdap.api.annotation.Macro;
 import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.exception.ErrorCategory;
+import io.cdap.cdap.api.exception.ErrorType;
+import io.cdap.cdap.api.exception.ErrorUtils;
 import io.cdap.cdap.etl.api.FailureCollector;
 import io.cdap.plugin.common.ConfigUtil;
 import io.cdap.plugin.gcp.bigquery.source.BigQuerySource;
@@ -224,9 +227,11 @@ private void checkIfArgumentsColumnsExitsInSource(Map<String, String> argumentCo
     String nonExistingColumnNames = argumentConditionMap.keySet().stream()
       .filter(columnName -> !argumentConditionFields.containsKey(columnName))
       .collect(Collectors.joining(" ,"));
-    throw new RuntimeException(String.format(
+    String error = String.format(
       "Columns: \" %s \"do not exist in table. Argument selections columns must exist in table.",
-      nonExistingColumnNames));
+      nonExistingColumnNames);
+    throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
+      error, error, ErrorType.USER, false, null);
   }
 
   static void checkIfArgumentsColumnsListExistsInSource(
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java
index 1566fe4fb..7070b5186 100644
--- a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java
@@ -44,6 +44,10 @@
 import io.cdap.cdap.api.annotation.Macro;
 import io.cdap.cdap.api.annotation.Name;
 import io.cdap.cdap.api.annotation.Plugin;
+import io.cdap.cdap.api.exception.ErrorCategory;
+import io.cdap.cdap.api.exception.ErrorCodeType;
+import io.cdap.cdap.api.exception.ErrorType;
+import io.cdap.cdap.api.exception.ErrorUtils;
 import io.cdap.cdap.etl.api.FailureCollector;
 import io.cdap.cdap.etl.api.action.Action;
 import io.cdap.cdap.etl.api.action.ActionContext;
@@ -52,6 +56,7 @@
 import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils;
 import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
 import io.cdap.plugin.gcp.common.CmekUtils;
+import io.cdap.plugin.gcp.common.GCPErrorDetailsProviderUtil;
 import io.cdap.plugin.gcp.common.GCPUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -151,24 +156,32 @@ public void run(ActionContext context) throws Exception {
     if (config.getRetryOnBackendError()) {
       try {
         executeQueryWithExponentialBackoff(bigQuery, queryConfig, context);
-      } catch (Throwable e) {
-        throw new RuntimeException(e);
+      } catch (Exception e) {
+        String error = String.format(
+          "Failed to execute query with exponential backoff with message: %s", e.getMessage());
+        throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, error, ErrorType.USER,
+          true, GCPUtils.BQ_SUPPORTED_DOC_URL);
       }
     } else {
       executeQuery(bigQuery, queryConfig, context);
     }
   }
 
-  protected void executeQueryWithExponentialBackoff(BigQuery bigQuery,
-                                                   QueryJobConfiguration queryConfig, ActionContext context)
-          throws Throwable {
+  void executeQueryWithExponentialBackoff(BigQuery bigQuery,
+                                          QueryJobConfiguration queryConfig, ActionContext context) {
     try {
       Failsafe.with(getRetryPolicy()).run(() -> executeQuery(bigQuery, queryConfig, context));
     } catch (FailsafeException e) {
       if (e.getCause() != null) {
-        throw e.getCause();
+        String error = String.format(
+          "Failed to execute query with exponential backoff with cause: %s.", e.getCause());
+        throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
+          error, error, ErrorType.USER, false, e);
       }
-      throw e;
+      String error = String.format(
+        "Failed to execute query with exponential backoff with message: %s.", e.getMessage());
+      throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, error, ErrorType.USER,
+        true, GCPUtils.BQ_SUPPORTED_DOC_URL);
     }
   }
 
@@ -201,9 +214,11 @@ private void executeQuery(BigQuery bigQuery, QueryJobConfiguration queryConfig,
     } catch (BigQueryException e) {
       LOG.error("The query job {} failed. Error: {}", jobId.getJob(), e.getError().getMessage());
       if (RETRY_ON_REASON.contains(e.getError().getReason())) {
-        throw new BigQueryJobExecutionException(e.getError().getMessage(), e);
+        throw new BigQueryJobExecutionException(e.getError().getMessage());
       }
-      throw new RuntimeException(e);
+      String error = String.format("Failed to execute query with message: %s", e.getMessage());
+      throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, error, ErrorType.UNKNOWN,
+        true, GCPUtils.GCS_SUPPORTED_DOC_URL);
     }
 
     // Check for errors
@@ -214,7 +229,9 @@ private void executeQuery(BigQuery bigQuery, QueryJobConfiguration queryConfig,
       if (RETRY_ON_REASON.contains(queryJob.getStatus().getError().getReason())) {
         throw new BigQueryJobExecutionException(queryJob.getStatus().getError().getMessage());
       }
-      throw new RuntimeException(queryJob.getStatus().getError().getMessage());
+      String error = queryJob.getStatus().getError().getMessage();
+      throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
+        error, error, ErrorType.UNKNOWN, false, null);
     }
 
     TableResult queryResults = queryJob.getQueryResults();