diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryArgumentSetter.java b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryArgumentSetter.java index f8faa92cb..9a30db2c4 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryArgumentSetter.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryArgumentSetter.java @@ -19,6 +19,7 @@ import com.google.auth.Credentials; import com.google.cloud.StringEnumValue; import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.FieldValue; import com.google.cloud.bigquery.FieldValueList; @@ -33,8 +34,12 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.etl.api.action.Action; import io.cdap.cdap.etl.api.action.ActionContext; +import io.cdap.plugin.gcp.bigquery.common.BigQueryErrorUtil; import io.cdap.plugin.gcp.common.GCPUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,16 +75,23 @@ public AbstractBigQueryActionConfig getConfig() { } @Override - public void run(ActionContext context) throws Exception { + public void run(ActionContext context) { config.validate(context.getFailureCollector()); QueryJobConfiguration queryConfig = config.getQueryJobConfiguration(context.getFailureCollector()); JobId jobId = JobId.newBuilder().setRandomJob().build(); // API request - starts the query. - Credentials credentials = config.getServiceAccount() == null ? - null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(), - config.isServiceAccountFilePath()); + Credentials credentials = null; + try { + credentials = config.getServiceAccount() == null ? null : + GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(), config.isServiceAccountFilePath()); + } catch (Exception e) { + context.getFailureCollector().addFailure( + String.format("Failed to load service account credentials, %s: %s", + e.getClass().getName(), e.getMessage()), null).withStacktrace(e.getStackTrace()); + context.getFailureCollector().getOrThrowException(); + } BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, null); Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build()); @@ -87,17 +99,50 @@ public void run(ActionContext context) throws Exception { LOG.debug("The BigQuery SQL {}", queryConfig.getQuery()); // Wait for the query to complete - queryJob.waitFor(); + try { + queryJob.waitFor(); + } catch (BigQueryException e) { + String errorMessage = String.format("The bigquery query job failed, %s: %s", + e.getClass().getName(), e.getMessage()); + throw BigQueryErrorUtil.getProgramFailureException(errorMessage, (e).getReason(), e); + } catch (InterruptedException e) { + String errorMessage = String.format("The bigquery query job interrupted, %s: %s", + e.getClass().getName(), e.getMessage()); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage, + ErrorType.UNKNOWN, true, e); + } // Check for errors if (queryJob.getStatus().getError() != null) { - throw new RuntimeException(queryJob.getStatus().getExecutionErrors().toString()); + String errorReason = String.format( + "The bigquery job failed with reason: %s. For more details, see %s", + queryJob.getStatus().getError().getReason(), GCPUtils.BQ_SUPPORTED_DOC_URL); + ErrorType type = BigQueryErrorUtil.getErrorType(queryJob.getStatus().getError().getReason()); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, + queryJob.getStatus().getExecutionErrors().toString(), type, true, null, null, + GCPUtils.BQ_SUPPORTED_DOC_URL, null); + } + TableResult queryResults; + try { + queryResults = queryJob.getQueryResults(); + } catch (BigQueryException e) { + String errorMessage = String.format("The bigquery query job failed, %s: %s", + e.getClass().getName(), e.getMessage()); + throw BigQueryErrorUtil.getProgramFailureException(errorMessage, e.getReason(), e); + } catch (InterruptedException e) { + String errorMessage = String.format("The bigquery query job interrupted, %s: %s", + e.getClass().getName(), e.getMessage()); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage, + ErrorType.UNKNOWN, false, e); } - - TableResult queryResults = queryJob.getQueryResults(); if (queryResults.getTotalRows() == 0 || queryResults.getTotalRows() > 1) { - throw new RuntimeException(String.format("The query result total rows should be \"1\" but is \"%d\"", - queryResults.getTotalRows())); + String error = String.format("The query result total rows should be \"1\" but is \"%d\"", + queryResults.getTotalRows()); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, null); } Schema schema = queryResults.getSchema(); diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryArgumentSetterConfig.java b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryArgumentSetterConfig.java index d0ffb827e..cac893ec1 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryArgumentSetterConfig.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryArgumentSetterConfig.java @@ -27,6 +27,9 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.plugin.common.ConfigUtil; import io.cdap.plugin.gcp.bigquery.source.BigQuerySource; @@ -224,9 +227,11 @@ private void checkIfArgumentsColumnsExitsInSource(Map argumentCo String nonExistingColumnNames = argumentConditionMap.keySet().stream() .filter(columnName -> !argumentConditionFields.containsKey(columnName)) .collect(Collectors.joining(" ,")); - throw new RuntimeException(String.format( + String error = String.format( "Columns: \" %s \"do not exist in table. Argument selections columns must exist in table.", - nonExistingColumnNames)); + nonExistingColumnNames); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, null); } static void checkIfArgumentsColumnsListExistsInSource( diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java index 1566fe4fb..d159e85da 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java @@ -44,14 +44,20 @@ import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCodeType; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.action.Action; import io.cdap.cdap.etl.api.action.ActionContext; import io.cdap.cdap.etl.common.Constants; +import io.cdap.plugin.gcp.bigquery.common.BigQueryErrorUtil; import io.cdap.plugin.gcp.bigquery.exception.BigQueryJobExecutionException; import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils; import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil; import io.cdap.plugin.gcp.common.CmekUtils; +import io.cdap.plugin.gcp.common.GCPErrorDetailsProviderUtil; import io.cdap.plugin.gcp.common.GCPUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,7 +99,7 @@ public final class BigQueryExecute extends AbstractBigQueryAction { } @Override - public void run(ActionContext context) throws Exception { + public void run(ActionContext context) { FailureCollector collector = context.getFailureCollector(); config.validate(collector, context.getArguments().asMap()); QueryJobConfiguration.Builder builder = QueryJobConfiguration.newBuilder(config.getSql()); @@ -125,9 +131,16 @@ public void run(ActionContext context) throws Exception { builder.setUseLegacySql(config.isLegacySQL()); // API request - starts the query. - Credentials credentials = config.getServiceAccount() == null ? - null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(), - config.isServiceAccountFilePath()); + Credentials credentials = null; + try { + credentials = config.getServiceAccount() == null ? + null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(), + config.isServiceAccountFilePath()); + } catch (IOException e) { + collector.addFailure(String.format("Failed to load service account credentials, %s: %s", + e.getClass().getName(), e.getMessage()), null).withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, config.getReadTimeout()); //create dataset to store the results if not exists if (config.getStoreResults() && !Strings.isNullOrEmpty(datasetName) && @@ -152,23 +165,46 @@ public void run(ActionContext context) throws Exception { try { executeQueryWithExponentialBackoff(bigQuery, queryConfig, context); } catch (Throwable e) { - throw new RuntimeException(e); + String errorMessage = String.format( + "Failed to execute query with exponential backoff, %s: %s", e.getClass().getName(), + e.getMessage()); + if (e instanceof BigQueryException) { + throw BigQueryErrorUtil.getProgramFailureException(errorMessage, + ((BigQueryException) e).getReason(), (Exception) e); + } + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage, + ErrorType.UNKNOWN, true, e); } } else { - executeQuery(bigQuery, queryConfig, context); + try { + executeQuery(bigQuery, queryConfig, context); + } catch (Exception e) { + String errorMessage = String.format("The bigquery query execution failed, %s: %s", + e.getClass().getName(), e.getMessage()); + String errorReason = null; + if (e instanceof BigQueryException) { + errorReason = ((BigQueryException) e).getReason(); + } + throw BigQueryErrorUtil.getProgramFailureException(errorMessage, errorReason, e); + } } } protected void executeQueryWithExponentialBackoff(BigQuery bigQuery, - QueryJobConfiguration queryConfig, ActionContext context) - throws Throwable { + QueryJobConfiguration queryConfig, ActionContext context) { try { Failsafe.with(getRetryPolicy()).run(() -> executeQuery(bigQuery, queryConfig, context)); } catch (FailsafeException e) { + String errorReason = String.format("The bigquery query execution failed with message: %s", + e.getMessage()); if (e.getCause() != null) { - throw e.getCause(); + errorReason = String.format("The bigquery query execution failed with message: %s", + e.getCause().getMessage()); } - throw e; + throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain( + e.getCause() == null ? e : e.getCause(), errorReason, ErrorType.UNKNOWN, true, + GCPUtils.BQ_SUPPORTED_DOC_URL); } } @@ -185,7 +221,7 @@ private RetryPolicy getRetryPolicy() { } private void executeQuery(BigQuery bigQuery, QueryJobConfiguration queryConfig, ActionContext context) - throws InterruptedException, BigQueryJobExecutionException { + throws BigQueryJobExecutionException { // Location must match that of the dataset(s) referenced in the query. JobId jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build(); Job queryJob; @@ -199,25 +235,60 @@ private void executeQuery(BigQuery bigQuery, QueryJobConfiguration queryConfig, // Wait for the query to complete queryJob = queryJob.waitFor(); } catch (BigQueryException e) { + String errorMessage = String.format("The bigquery query execution failed, %s: %s", + e.getClass().getName(), e.getMessage()); LOG.error("The query job {} failed. Error: {}", jobId.getJob(), e.getError().getMessage()); if (RETRY_ON_REASON.contains(e.getError().getReason())) { throw new BigQueryJobExecutionException(e.getError().getMessage(), e); } - throw new RuntimeException(e); + throw BigQueryErrorUtil.getProgramFailureException(errorMessage, e.getReason(), e); + } catch (InterruptedException e) { + String errorMessage = String.format("The bigquery query execution interrupted, %s: %s", + e.getClass().getName(), e.getMessage()); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage, + ErrorType.UNKNOWN, true, e); } // Check for errors if (queryJob.getStatus().getError() != null) { // You can also look at queryJob.getStatus().getExecutionErrors() for all // errors, not just the latest one. - LOG.error("The query job {} failed. Error: {}", jobId.getJob(), queryJob.getStatus().getError()); + LOG.error("The query job {} failed with reason: {} and error: {}.", jobId.getJob(), + queryJob.getStatus().getError().getReason(), + queryJob.getStatus().getExecutionErrors().toString()); if (RETRY_ON_REASON.contains(queryJob.getStatus().getError().getReason())) { throw new BigQueryJobExecutionException(queryJob.getStatus().getError().getMessage()); } - throw new RuntimeException(queryJob.getStatus().getError().getMessage()); + String errorReason = String.format( + "The bigquery query execution failed due to reason: %s and error: %s. " + + "For more details, see %s", queryJob.getStatus().getError().getReason(), + queryJob.getStatus().getExecutionErrors().toString(), GCPUtils.BQ_SUPPORTED_DOC_URL); + String errorMessage = String.format( + "The bigquery query execution failed due to reason: %s , error: %s and message: %s", + queryJob.getStatus().getError().getReason(), + queryJob.getStatus().getExecutionErrors().toString(), + queryJob.getStatus().getError().getMessage()); + ErrorType type = BigQueryErrorUtil.getErrorType(queryJob.getStatus().getError().getReason()); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage, + type, true, null, null, GCPUtils.BQ_SUPPORTED_DOC_URL, null); + } + + TableResult queryResults; + try { + queryResults = queryJob.getQueryResults(); + } catch (BigQueryException e) { + String errorMessage = String.format("Failed to retrieve query result, %s: %s", + e.getClass().getName(), e.getMessage()); + throw BigQueryErrorUtil.getProgramFailureException(errorMessage, e.getReason(), e); + } catch (InterruptedException e) { + String errorMessage = String.format("Query result retrieval was interrupted, %s: %s", + e.getClass().getName(), e.getMessage()); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage, + ErrorType.UNKNOWN, true, e); } - - TableResult queryResults = queryJob.getQueryResults(); long rows = queryResults.getTotalRows(); if (config.shouldSetAsArguments()) { @@ -659,11 +730,12 @@ public void validateSQLSyntax(FailureCollector failureCollector, BigQuery bigQue bigQuery.create(JobInfo.of(queryJobConfiguration)); } catch (BigQueryException e) { final String errorMessage; - if (e.getCode() == ERROR_CODE_NOT_FOUND) { - errorMessage = String.format("Resource was not found. Please verify the resource name. If the resource " + - "will be created at runtime, then update to use a macro for the resource name. Error message received " + - "was: %s", e.getMessage()); - } else { + if (e.getCode() == ERROR_CODE_NOT_FOUND) { + errorMessage = String.format( + "Resource was not found. Please verify the resource name. If the resource will be " + + "created at runtime, then update to use a macro for the resource name. " + + "Error message received was %s: %s", e.getClass().getName(), e.getMessage()); + } else { errorMessage = e.getMessage(); } failureCollector.addFailure(String.format("%s. Error code: %s.", errorMessage, e.getCode()), diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/common/BigQueryErrorUtil.java b/src/main/java/io/cdap/plugin/gcp/bigquery/common/BigQueryErrorUtil.java new file mode 100644 index 000000000..20626aa87 --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/common/BigQueryErrorUtil.java @@ -0,0 +1,96 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.gcp.bigquery.common; + +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ProgramFailureException; +import io.cdap.plugin.gcp.common.GCPErrorDetailsProviderUtil; +import io.cdap.plugin.gcp.common.GCPUtils; +import java.util.HashMap; +import java.util.Map; + +/** + * Utility class to fetch more details from BigQueryException + */ +public final class BigQueryErrorUtil { + + // https://cloud.google.com/bigquery/docs/error-messages#errortable + private static final Map ERROR_REASON_TO_ERROR_TYPE = new HashMap<>(); + private static final Map ERROR_REASON_TO_ERROR_CODE = new HashMap<>(); + + static { + // User Errors + ERROR_REASON_TO_ERROR_TYPE.put("invalid", ErrorType.USER); + ERROR_REASON_TO_ERROR_TYPE.put("invalidQuery", ErrorType.USER); + ERROR_REASON_TO_ERROR_TYPE.put("billingTierLimitExceeded", ErrorType.USER); + ERROR_REASON_TO_ERROR_TYPE.put("resourceInUse", ErrorType.USER); + ERROR_REASON_TO_ERROR_TYPE.put("resourcesExceeded", ErrorType.USER); + ERROR_REASON_TO_ERROR_TYPE.put("badRequest", ErrorType.USER); + ERROR_REASON_TO_ERROR_TYPE.put("invalidUser", ErrorType.USER); + ERROR_REASON_TO_ERROR_TYPE.put("notFound", ErrorType.USER); + ERROR_REASON_TO_ERROR_TYPE.put("duplicate", ErrorType.USER); + ERROR_REASON_TO_ERROR_TYPE.put("accessDenied", ErrorType.USER); + ERROR_REASON_TO_ERROR_TYPE.put("billingNotEnabled", ErrorType.USER); + ERROR_REASON_TO_ERROR_TYPE.put("quotaExceeded", ErrorType.USER); + ERROR_REASON_TO_ERROR_TYPE.put("rateLimitExceeded", ErrorType.USER); + ERROR_REASON_TO_ERROR_TYPE.put("responseTooLarge", ErrorType.USER); + ERROR_REASON_TO_ERROR_TYPE.put("blocked", ErrorType.USER); + ERROR_REASON_TO_ERROR_TYPE.put("proxyAuthenticationRequired", ErrorType.USER); + ERROR_REASON_TO_ERROR_TYPE.put("jobRateLimitExceeded", ErrorType.USER); + + // System Errors + ERROR_REASON_TO_ERROR_TYPE.put("tableUnavailable", ErrorType.SYSTEM); + ERROR_REASON_TO_ERROR_TYPE.put("backendError", ErrorType.SYSTEM); + ERROR_REASON_TO_ERROR_TYPE.put("internalError", ErrorType.SYSTEM); + ERROR_REASON_TO_ERROR_TYPE.put("notImplemented", ErrorType.SYSTEM); + ERROR_REASON_TO_ERROR_TYPE.put("jobBackendError", ErrorType.SYSTEM); + ERROR_REASON_TO_ERROR_TYPE.put("jobInternalError", ErrorType.SYSTEM); + ERROR_REASON_TO_ERROR_TYPE.put("timeout", ErrorType.SYSTEM); + + // Unknown Errors + ERROR_REASON_TO_ERROR_TYPE.put("stopped", ErrorType.UNKNOWN); + } + + /** + * Method to get the error type based on the error reason. + * + * @param errorReason the error reason to classify + * @return the corresponding ErrorType (USER, SYSTEM, UNKNOWN) + */ + public static ErrorType getErrorType(String errorReason) { + if (errorReason != null && ERROR_REASON_TO_ERROR_TYPE.containsKey(errorReason)) { + return ERROR_REASON_TO_ERROR_TYPE.get(errorReason); + } + return ErrorType.UNKNOWN; + } + + /** + * Method to get the Program Failure exception based on error reason + * + * @param errorMessage + * @param errorReason + * @param e + * @return + */ + public static ProgramFailureException getProgramFailureException(String errorMessage, + String errorReason, Exception e) { + ErrorType errorType = BigQueryErrorUtil.getErrorType(errorReason); + return GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorMessage, + errorType, true, GCPUtils.BQ_SUPPORTED_DOC_URL); + } +} diff --git a/src/main/java/io/cdap/plugin/gcp/common/GCPErrorDetailsProviderUtil.java b/src/main/java/io/cdap/plugin/gcp/common/GCPErrorDetailsProviderUtil.java index cebd66792..b14d131fa 100644 --- a/src/main/java/io/cdap/plugin/gcp/common/GCPErrorDetailsProviderUtil.java +++ b/src/main/java/io/cdap/plugin/gcp/common/GCPErrorDetailsProviderUtil.java @@ -69,7 +69,7 @@ public static ProgramFailureException getProgramFailureException(HttpResponseExc ErrorCodeType.HTTP, statusCode.toString(), externalDocumentationLink, e); } - public static ProgramFailureException getHttpResponseExceptionDetailsFromChain(Exception e, String errorReason, + public static ProgramFailureException getHttpResponseExceptionDetailsFromChain(Throwable e, String errorReason, ErrorType errorType, boolean dependency, String externalDocUrl) { diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecuteConfigTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecuteConfigTest.java index 9dc2a22f8..34ec48b79 100644 --- a/src/test/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecuteConfigTest.java +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecuteConfigTest.java @@ -59,7 +59,7 @@ private BigQueryExecute.Config getConfig(String sql) throws NoSuchFieldException @Test public void testBigQueryExecuteSQLWithNonExistentResource() throws Exception { String errorMessage = "Resource was not found. Please verify the resource name. If the resource will be created " + - "at runtime, then update to use a macro for the resource name. Error message received was: "; + "at runtime, then update to use a macro for the resource name. Error message received was "; int errorCode = 404; BigQueryExecute.Config config = getConfig("select * from dataset.table where id=1"); MockFailureCollector failureCollector = new MockFailureCollector(); @@ -69,8 +69,7 @@ public void testBigQueryExecuteSQLWithNonExistentResource() throws Exception { config.validateSQLSyntax(failureCollector, bigQuery); LOG.warn("size : {}", failureCollector.getValidationFailures().size()); Assert.assertEquals(1, failureCollector.getValidationFailures().size()); - Assert.assertEquals(String.format("%s. Error code: %s.", errorMessage, errorCode), - failureCollector.getValidationFailures().get(0).getMessage()); + Assert.assertTrue(failureCollector.getValidationFailures().get(0).getMessage().contains(errorMessage)); Assert.assertEquals("sql", failureCollector.getValidationFailures().get(0).getCauses().get(0).getAttribute("stageConfig")); } diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecuteTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecuteTest.java index 53fe7c766..e3502a5b7 100644 --- a/src/test/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecuteTest.java +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecuteTest.java @@ -25,20 +25,18 @@ import com.google.cloud.bigquery.JobStatus; import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.TableResult; -import com.google.common.collect.ImmutableSet; +import io.cdap.cdap.api.exception.ProgramFailureException; import io.cdap.cdap.api.metrics.Metrics; import io.cdap.cdap.etl.api.StageMetrics; import io.cdap.cdap.etl.api.action.ActionContext; import io.cdap.cdap.etl.mock.validation.MockFailureCollector; -import io.cdap.plugin.gcp.bigquery.exception.BigQueryJobExecutionException; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; -import java.util.Set; public class BigQueryExecuteTest { @Mock @@ -109,18 +107,16 @@ public void testExecuteQueryWithExponentialBackoffFailsWithNonRetryError() { Exception exception = Assert.assertThrows(java.lang.RuntimeException.class, () -> { bq.executeQueryWithExponentialBackoff(bigQuery, queryJobConfiguration, context); }); - String actualMessage = exception.getMessage(); - Assert.assertEquals(mockErrorMessageNoRetry, actualMessage); + Assert.assertTrue(exception.getMessage().contains(mockErrorMessageNoRetry)); } @Test public void testExecuteQueryWithExponentialBackoffFailsRetryError() { Mockito.when(bigQueryError.getReason()).thenReturn("jobBackendError"); Mockito.when(bigQueryError.getMessage()).thenReturn(errorMessageRetryExhausted); - Exception exception = Assert.assertThrows(BigQueryJobExecutionException.class, () -> { + Exception exception = Assert.assertThrows(ProgramFailureException.class, () -> { bq.executeQueryWithExponentialBackoff(bigQuery, queryJobConfiguration, context); }); - String actualMessage = exception.getMessage(); - Assert.assertEquals(errorMessageRetryExhausted, actualMessage); + Assert.assertTrue(exception.getMessage().contains(errorMessageRetryExhausted)); } @Test