Skip to content

Commit 5d4f7bc

Browse files
Amit-CloudSufipsainics
authored andcommitted
Error management for BigQuery
1 parent 77e0ac8 commit 5d4f7bc

File tree

3 files changed

+85
-27
lines changed

3 files changed

+85
-27
lines changed

src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryArgumentSetter.java

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@
3333
import io.cdap.cdap.api.annotation.Description;
3434
import io.cdap.cdap.api.annotation.Name;
3535
import io.cdap.cdap.api.annotation.Plugin;
36+
import io.cdap.cdap.api.exception.ErrorCategory;
37+
import io.cdap.cdap.api.exception.ErrorType;
38+
import io.cdap.cdap.api.exception.ErrorUtils;
3639
import io.cdap.cdap.etl.api.action.Action;
3740
import io.cdap.cdap.etl.api.action.ActionContext;
3841
import io.cdap.plugin.gcp.common.GCPUtils;
@@ -70,34 +73,57 @@ public AbstractBigQueryActionConfig getConfig() {
7073
}
7174

7275
@Override
73-
public void run(ActionContext context) throws Exception {
76+
public void run(ActionContext context) {
7477
config.validate(context.getFailureCollector());
7578

7679
QueryJobConfiguration queryConfig = config.getQueryJobConfiguration(context.getFailureCollector());
7780
JobId jobId = JobId.newBuilder().setRandomJob().build();
7881

7982
// API request - starts the query.
80-
Credentials credentials = config.getServiceAccount() == null ?
81-
null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(),
82-
config.isServiceAccountFilePath());
83+
Credentials credentials = null;
84+
try {
85+
credentials = config.getServiceAccount() == null ? null :
86+
GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(), config.isServiceAccountFilePath());
87+
} catch (Exception e) {
88+
context.getFailureCollector().addFailure(
89+
String.format("Failed to load service account credentials: %s", e.getMessage()), null)
90+
.withStacktrace(e.getStackTrace());
91+
context.getFailureCollector().getOrThrowException();
92+
}
8393
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, null);
8494
Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
8595

8696
LOG.info("Executing SQL as job {}.", jobId.getJob());
8797
LOG.debug("The BigQuery SQL {}", queryConfig.getQuery());
8898

8999
// Wait for the query to complete
90-
queryJob.waitFor();
100+
try {
101+
queryJob.waitFor();
102+
} catch (InterruptedException e) {
103+
String errorMessage = String.format("The query job was interrupted: %s", e.getMessage());
104+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
105+
errorMessage, errorMessage, ErrorType.SYSTEM, true, e);
106+
}
91107

92108
// Check for errors
93109
if (queryJob.getStatus().getError() != null) {
94-
throw new RuntimeException(queryJob.getStatus().getExecutionErrors().toString());
110+
String error = queryJob.getStatus().getExecutionErrors().toString();
111+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
112+
error, error, ErrorType.UNKNOWN, true, null);
113+
}
114+
TableResult queryResults;
115+
try {
116+
queryResults = queryJob.getQueryResults();
117+
} catch (InterruptedException e) {
118+
String errorMessage = String.format("The query job was interrupted: %s", e.getMessage());
119+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
120+
errorMessage, errorMessage, ErrorType.SYSTEM, true, e);
95121
}
96-
97-
TableResult queryResults = queryJob.getQueryResults();
98122
if (queryResults.getTotalRows() == 0 || queryResults.getTotalRows() > 1) {
99-
throw new RuntimeException(String.format("The query result total rows should be \"1\" but is \"%d\"",
100-
queryResults.getTotalRows()));
123+
String error = String.format("The query result total rows should be \"1\" but is \"%d\"",
124+
queryResults.getTotalRows());
125+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
126+
error, error, ErrorType.SYSTEM, true, null);
101127
}
102128

103129
Schema schema = queryResults.getSchema();

src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryArgumentSetterConfig.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import io.cdap.cdap.api.annotation.Description;
2828
import io.cdap.cdap.api.annotation.Macro;
2929
import io.cdap.cdap.api.annotation.Name;
30+
import io.cdap.cdap.api.exception.ErrorCategory;
31+
import io.cdap.cdap.api.exception.ErrorType;
32+
import io.cdap.cdap.api.exception.ErrorUtils;
3033
import io.cdap.cdap.etl.api.FailureCollector;
3134
import io.cdap.plugin.common.ConfigUtil;
3235
import io.cdap.plugin.gcp.bigquery.source.BigQuerySource;
@@ -224,9 +227,11 @@ private void checkIfArgumentsColumnsExitsInSource(Map<String, String> argumentCo
224227
String nonExistingColumnNames = argumentConditionMap.keySet().stream()
225228
.filter(columnName -> !argumentConditionFields.containsKey(columnName))
226229
.collect(Collectors.joining(" ,"));
227-
throw new RuntimeException(String.format(
230+
String error = String.format(
228231
"Columns: \" %s \"do not exist in table. Argument selections columns must exist in table.",
229-
nonExistingColumnNames));
232+
nonExistingColumnNames);
233+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
234+
error, error, ErrorType.USER, true, null);
230235
}
231236

232237
static void checkIfArgumentsColumnsListExistsInSource(

src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@
4444
import io.cdap.cdap.api.annotation.Macro;
4545
import io.cdap.cdap.api.annotation.Name;
4646
import io.cdap.cdap.api.annotation.Plugin;
47+
import io.cdap.cdap.api.exception.ErrorCategory;
48+
import io.cdap.cdap.api.exception.ErrorType;
49+
import io.cdap.cdap.api.exception.ErrorUtils;
4750
import io.cdap.cdap.etl.api.FailureCollector;
4851
import io.cdap.cdap.etl.api.action.Action;
4952
import io.cdap.cdap.etl.api.action.ActionContext;
@@ -52,6 +55,7 @@
5255
import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils;
5356
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
5457
import io.cdap.plugin.gcp.common.CmekUtils;
58+
import io.cdap.plugin.gcp.common.GCPErrorDetailsProviderUtil;
5559
import io.cdap.plugin.gcp.common.GCPUtils;
5660
import org.slf4j.Logger;
5761
import org.slf4j.LoggerFactory;
@@ -93,7 +97,7 @@ public final class BigQueryExecute extends AbstractBigQueryAction {
9397
}
9498

9599
@Override
96-
public void run(ActionContext context) throws Exception {
100+
public void run(ActionContext context) {
97101
FailureCollector collector = context.getFailureCollector();
98102
config.validate(collector, context.getArguments().asMap());
99103
QueryJobConfiguration.Builder builder = QueryJobConfiguration.newBuilder(config.getSql());
@@ -125,9 +129,16 @@ public void run(ActionContext context) throws Exception {
125129
builder.setUseLegacySql(config.isLegacySQL());
126130

127131
// API request - starts the query.
128-
Credentials credentials = config.getServiceAccount() == null ?
129-
null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(),
130-
config.isServiceAccountFilePath());
132+
Credentials credentials = null;
133+
try {
134+
credentials = config.getServiceAccount() == null ?
135+
null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(),
136+
config.isServiceAccountFilePath());
137+
} catch (IOException e) {
138+
collector.addFailure(String.format("Failed to load service account credentials: %s", e.getMessage()), null)
139+
.withStacktrace(e.getStackTrace());
140+
context.getFailureCollector().getOrThrowException();
141+
}
131142
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, config.getReadTimeout());
132143
//create dataset to store the results if not exists
133144
if (config.getStoreResults() && !Strings.isNullOrEmpty(datasetName) &&
@@ -152,23 +163,35 @@ public void run(ActionContext context) throws Exception {
152163
try {
153164
executeQueryWithExponentialBackoff(bigQuery, queryConfig, context);
154165
} catch (Throwable e) {
155-
throw new RuntimeException(e);
166+
if (e instanceof Exception) {
167+
String error =
168+
String.format("Failed to execute query with exponential backoff with message: %s", e.getMessage());
169+
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain((Exception) e, error,
170+
ErrorType.USER, true, GCPUtils.BQ_SUPPORTED_DOC_URL);
171+
}
172+
String errorMessage = String.format("Failed to execute query with exponential backoff with message: %s",
173+
e.getMessage());
174+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
175+
errorMessage, errorMessage, ErrorType.UNKNOWN, true, null);
156176
}
157177
} else {
158-
executeQuery(bigQuery, queryConfig, context);
178+
try {
179+
executeQuery(bigQuery, queryConfig, context);
180+
} catch (Exception e) {
181+
String errorMessage = String.format("Failed to execute query with message: %s", e.getMessage());
182+
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorMessage, ErrorType.UNKNOWN,
183+
true, GCPUtils.BQ_SUPPORTED_DOC_URL);
184+
}
159185
}
160186
}
161187

162-
protected void executeQueryWithExponentialBackoff(BigQuery bigQuery,
163-
QueryJobConfiguration queryConfig, ActionContext context)
164-
throws Throwable {
188+
void executeQueryWithExponentialBackoff(BigQuery bigQuery,
189+
QueryJobConfiguration queryConfig, ActionContext context) throws Throwable {
165190
try {
166191
Failsafe.with(getRetryPolicy()).run(() -> executeQuery(bigQuery, queryConfig, context));
167192
} catch (FailsafeException e) {
168-
if (e.getCause() != null) {
169-
throw e.getCause();
170-
}
171-
throw e;
193+
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, e.getCause().getMessage(),
194+
ErrorType.SYSTEM, true, GCPUtils.BQ_SUPPORTED_DOC_URL);
172195
}
173196
}
174197

@@ -203,7 +226,9 @@ private void executeQuery(BigQuery bigQuery, QueryJobConfiguration queryConfig,
203226
if (RETRY_ON_REASON.contains(e.getError().getReason())) {
204227
throw new BigQueryJobExecutionException(e.getError().getMessage(), e);
205228
}
206-
throw new RuntimeException(e);
229+
String error = String.format("Failed to execute query with message: %s", e.getMessage());
230+
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, error, ErrorType.UNKNOWN,
231+
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
207232
}
208233

209234
// Check for errors
@@ -214,7 +239,9 @@ private void executeQuery(BigQuery bigQuery, QueryJobConfiguration queryConfig,
214239
if (RETRY_ON_REASON.contains(queryJob.getStatus().getError().getReason())) {
215240
throw new BigQueryJobExecutionException(queryJob.getStatus().getError().getMessage());
216241
}
217-
throw new RuntimeException(queryJob.getStatus().getError().getMessage());
242+
String error = queryJob.getStatus().getError().getMessage();
243+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
244+
error, error, ErrorType.UNKNOWN, false, null);
218245
}
219246

220247
TableResult queryResults = queryJob.getQueryResults();

0 commit comments

Comments
 (0)