Skip to content

Commit

Permalink
Error management for BigQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
Amit-CloudSufi authored and psainics committed Jan 21, 2025
1 parent 77e0ac8 commit 1b88f26
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.action.Action;
import io.cdap.cdap.etl.api.action.ActionContext;
import io.cdap.plugin.gcp.common.GCPUtils;
Expand Down Expand Up @@ -91,13 +94,17 @@ public void run(ActionContext context) throws Exception {

// Check for errors
if (queryJob.getStatus().getError() != null) {
throw new RuntimeException(queryJob.getStatus().getExecutionErrors().toString());
String error = queryJob.getStatus().getExecutionErrors().toString();
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}

TableResult queryResults = queryJob.getQueryResults();
if (queryResults.getTotalRows() == 0 || queryResults.getTotalRows() > 1) {
throw new RuntimeException(String.format("The query result total rows should be \"1\" but is \"%d\"",
queryResults.getTotalRows()));
String error = String.format("The query result total rows should be \"1\" but is \"%d\"",
queryResults.getTotalRows());
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}

Schema schema = queryResults.getSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.plugin.common.ConfigUtil;
import io.cdap.plugin.gcp.bigquery.source.BigQuerySource;
Expand Down Expand Up @@ -224,9 +227,11 @@ private void checkIfArgumentsColumnsExitsInSource(Map<String, String> argumentCo
String nonExistingColumnNames = argumentConditionMap.keySet().stream()
.filter(columnName -> !argumentConditionFields.containsKey(columnName))
.collect(Collectors.joining(" ,"));
throw new RuntimeException(String.format(
String error = String.format(
"Columns: \" %s \"do not exist in table. Argument selections columns must exist in table.",
nonExistingColumnNames));
nonExistingColumnNames);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}

static void checkIfArgumentsColumnsListExistsInSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCodeType;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.action.Action;
import io.cdap.cdap.etl.api.action.ActionContext;
Expand All @@ -52,6 +56,7 @@
import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPErrorDetailsProviderUtil;
import io.cdap.plugin.gcp.common.GCPUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -151,24 +156,32 @@ public void run(ActionContext context) throws Exception {
if (config.getRetryOnBackendError()) {
try {
executeQueryWithExponentialBackoff(bigQuery, queryConfig, context);
} catch (Throwable e) {
throw new RuntimeException(e);
} catch (Exception e) {
String error = String.format(
"Failed to execute query with exponential backoff with message: %s", e.getMessage());
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, error, ErrorType.USER,
true, GCPUtils.BQ_SUPPORTED_DOC_URL);
}
} else {
executeQuery(bigQuery, queryConfig, context);
}
}

protected void executeQueryWithExponentialBackoff(BigQuery bigQuery,
QueryJobConfiguration queryConfig, ActionContext context)
throws Throwable {
void executeQueryWithExponentialBackoff(BigQuery bigQuery,
QueryJobConfiguration queryConfig, ActionContext context) {
try {
Failsafe.with(getRetryPolicy()).run(() -> executeQuery(bigQuery, queryConfig, context));
} catch (FailsafeException e) {
if (e.getCause() != null) {
throw e.getCause();
String error = String.format(
"Failed to execute query with exponential backoff with cause: %s.", e.getCause());
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, e);
}
throw e;
String error = String.format(
"Failed to execute query with exponential backoff with message: %s.", e.getMessage());
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, error, ErrorType.USER,
true, GCPUtils.BQ_SUPPORTED_DOC_URL);
}
}

Expand Down Expand Up @@ -201,9 +214,11 @@ private void executeQuery(BigQuery bigQuery, QueryJobConfiguration queryConfig,
} catch (BigQueryException e) {
LOG.error("The query job {} failed. Error: {}", jobId.getJob(), e.getError().getMessage());
if (RETRY_ON_REASON.contains(e.getError().getReason())) {
throw new BigQueryJobExecutionException(e.getError().getMessage(), e);
throw new BigQueryJobExecutionException(e.getError().getMessage());
}
throw new RuntimeException(e);
String error = String.format("Failed to execute query with message: %s", e.getMessage());
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, error, ErrorType.UNKNOWN,
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
}

// Check for errors
Expand All @@ -214,7 +229,9 @@ private void executeQuery(BigQuery bigQuery, QueryJobConfiguration queryConfig,
if (RETRY_ON_REASON.contains(queryJob.getStatus().getError().getReason())) {
throw new BigQueryJobExecutionException(queryJob.getStatus().getError().getMessage());
}
throw new RuntimeException(queryJob.getStatus().getError().getMessage());
String error = queryJob.getStatus().getError().getMessage();
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.UNKNOWN, false, null);
}

TableResult queryResults = queryJob.getQueryResults();
Expand Down

0 comments on commit 1b88f26

Please sign in to comment.