44
44
import io .cdap .cdap .api .annotation .Macro ;
45
45
import io .cdap .cdap .api .annotation .Name ;
46
46
import io .cdap .cdap .api .annotation .Plugin ;
47
+ import io .cdap .cdap .api .exception .ErrorCategory ;
48
+ import io .cdap .cdap .api .exception .ErrorType ;
49
+ import io .cdap .cdap .api .exception .ErrorUtils ;
47
50
import io .cdap .cdap .etl .api .FailureCollector ;
48
51
import io .cdap .cdap .etl .api .action .Action ;
49
52
import io .cdap .cdap .etl .api .action .ActionContext ;
50
53
import io .cdap .cdap .etl .common .Constants ;
54
+ import io .cdap .plugin .gcp .bigquery .common .BigQueryErrorUtil ;
51
55
import io .cdap .plugin .gcp .bigquery .exception .BigQueryJobExecutionException ;
52
56
import io .cdap .plugin .gcp .bigquery .sink .BigQuerySinkUtils ;
53
57
import io .cdap .plugin .gcp .bigquery .util .BigQueryUtil ;
54
58
import io .cdap .plugin .gcp .common .CmekUtils ;
59
+ import io .cdap .plugin .gcp .common .GCPErrorDetailsProviderUtil ;
55
60
import io .cdap .plugin .gcp .common .GCPUtils ;
56
61
import org .slf4j .Logger ;
57
62
import org .slf4j .LoggerFactory ;
@@ -93,7 +98,7 @@ public final class BigQueryExecute extends AbstractBigQueryAction {
93
98
}
94
99
95
100
@ Override
96
- public void run (ActionContext context ) throws Exception {
101
+ public void run (ActionContext context ) {
97
102
FailureCollector collector = context .getFailureCollector ();
98
103
config .validate (collector , context .getArguments ().asMap ());
99
104
QueryJobConfiguration .Builder builder = QueryJobConfiguration .newBuilder (config .getSql ());
@@ -125,9 +130,16 @@ public void run(ActionContext context) throws Exception {
125
130
builder .setUseLegacySql (config .isLegacySQL ());
126
131
127
132
// API request - starts the query.
128
- Credentials credentials = config .getServiceAccount () == null ?
129
- null : GCPUtils .loadServiceAccountCredentials (config .getServiceAccount (),
130
- config .isServiceAccountFilePath ());
133
+ Credentials credentials = null ;
134
+ try {
135
+ credentials = config .getServiceAccount () == null ?
136
+ null : GCPUtils .loadServiceAccountCredentials (config .getServiceAccount (),
137
+ config .isServiceAccountFilePath ());
138
+ } catch (IOException e ) {
139
+ collector .addFailure (String .format ("Failed to load service account credentials, %s: %s" ,
140
+ e .getClass ().getName (), e .getMessage ()), null ).withStacktrace (e .getStackTrace ());
141
+ collector .getOrThrowException ();
142
+ }
131
143
BigQuery bigQuery = GCPUtils .getBigQuery (config .getProject (), credentials , config .getReadTimeout ());
132
144
//create dataset to store the results if not exists
133
145
if (config .getStoreResults () && !Strings .isNullOrEmpty (datasetName ) &&
@@ -152,23 +164,46 @@ public void run(ActionContext context) throws Exception {
152
164
try {
153
165
executeQueryWithExponentialBackoff (bigQuery , queryConfig , context );
154
166
} catch (Throwable e ) {
155
- throw new RuntimeException (e );
167
+ String errorMessage = String .format (
168
+ "Failed to execute query with exponential backoff, %s: %s" , e .getClass ().getName (),
169
+ e .getMessage ());
170
+ if (e instanceof BigQueryException ) {
171
+ throw BigQueryErrorUtil .getProgramFailureException (errorMessage ,
172
+ ((BigQueryException ) e ).getReason (), (Exception ) e );
173
+ }
174
+ throw ErrorUtils .getProgramFailureException (
175
+ new ErrorCategory (ErrorCategory .ErrorCategoryEnum .PLUGIN ), errorMessage , errorMessage ,
176
+ ErrorType .UNKNOWN , true , e );
156
177
}
157
178
} else {
158
- executeQuery (bigQuery , queryConfig , context );
179
+ try {
180
+ executeQuery (bigQuery , queryConfig , context );
181
+ } catch (Exception e ) {
182
+ String errorMessage = String .format ("Failed to execute query, %s: %s" ,
183
+ e .getClass ().getName (), e .getMessage ());
184
+ String errorReason = null ;
185
+ if (e instanceof BigQueryException ) {
186
+ errorReason = ((BigQueryException ) e ).getReason ();
187
+ }
188
+ throw BigQueryErrorUtil .getProgramFailureException (errorMessage , errorReason , e );
189
+ }
159
190
}
160
191
}
161
192
162
193
protected void executeQueryWithExponentialBackoff (BigQuery bigQuery ,
163
- QueryJobConfiguration queryConfig , ActionContext context )
164
- throws Throwable {
194
+ QueryJobConfiguration queryConfig , ActionContext context ) {
165
195
try {
166
196
Failsafe .with (getRetryPolicy ()).run (() -> executeQuery (bigQuery , queryConfig , context ));
167
197
} catch (FailsafeException e ) {
198
+ String errorReason = String .format ("Failed to execute query with message: %s" ,
199
+ e .getMessage ());
168
200
if (e .getCause () != null ) {
169
- throw e .getCause ();
201
+ errorReason = String .format ("Failed to execute query with message: %s" ,
202
+ e .getCause ().getMessage ());
170
203
}
171
- throw e ;
204
+ throw GCPErrorDetailsProviderUtil .getHttpResponseExceptionDetailsFromChain (
205
+ e == null ? e : e .getCause (), errorReason , ErrorType .UNKNOWN , true ,
206
+ GCPUtils .BQ_SUPPORTED_DOC_URL );
172
207
}
173
208
}
174
209
@@ -185,7 +220,7 @@ private RetryPolicy<Object> getRetryPolicy() {
185
220
}
186
221
187
222
private void executeQuery (BigQuery bigQuery , QueryJobConfiguration queryConfig , ActionContext context )
188
- throws InterruptedException , BigQueryJobExecutionException {
223
+ throws BigQueryJobExecutionException {
189
224
// Location must match that of the dataset(s) referenced in the query.
190
225
JobId jobId = JobId .newBuilder ().setRandomJob ().setLocation (config .getLocation ()).build ();
191
226
Job queryJob ;
@@ -198,12 +233,22 @@ private void executeQuery(BigQuery bigQuery, QueryJobConfiguration queryConfig,
198
233
199
234
// Wait for the query to complete
200
235
queryJob = queryJob .waitFor ();
201
- } catch (BigQueryException e ) {
202
- LOG .error ("The query job {} failed. Error: {}" , jobId .getJob (), e .getError ().getMessage ());
203
- if (RETRY_ON_REASON .contains (e .getError ().getReason ())) {
204
- throw new BigQueryJobExecutionException (e .getError ().getMessage (), e );
236
+ } catch (BigQueryException | InterruptedException e ) {
237
+ String errorMessage = String .format ("Failed to execute query, %s: %s" , e .getClass ().getName (),
238
+ e .getMessage ());
239
+ if (e instanceof BigQueryException ) {
240
+ LOG .error ("The query job {} failed. Error: {}" , jobId .getJob (),
241
+ ((BigQueryException ) e ).getError ().getMessage ());
242
+ if (RETRY_ON_REASON .contains (((BigQueryException ) e ).getError ().getReason ())) {
243
+ throw new BigQueryJobExecutionException (((BigQueryException ) e ).getError ().getMessage (),
244
+ e );
245
+ }
246
+ throw BigQueryErrorUtil .getProgramFailureException (errorMessage ,
247
+ ((BigQueryException ) e ).getReason (), e );
205
248
}
206
- throw new RuntimeException (e );
249
+ throw ErrorUtils .getProgramFailureException (
250
+ new ErrorCategory (ErrorCategory .ErrorCategoryEnum .PLUGIN ), errorMessage , errorMessage ,
251
+ ErrorType .UNKNOWN , true , e );
207
252
}
208
253
209
254
// Check for errors
@@ -214,10 +259,29 @@ private void executeQuery(BigQuery bigQuery, QueryJobConfiguration queryConfig,
214
259
if (RETRY_ON_REASON .contains (queryJob .getStatus ().getError ().getReason ())) {
215
260
throw new BigQueryJobExecutionException (queryJob .getStatus ().getError ().getMessage ());
216
261
}
217
- throw new RuntimeException (queryJob .getStatus ().getError ().getMessage ());
262
+ String error = String .format ("Failed to execute query with reason: %s and message: %s" ,
263
+ queryJob .getStatus ().getError ().getReason (),
264
+ queryJob .getStatus ().getError ().getMessage ());
265
+ ErrorType type = BigQueryErrorUtil .getErrorType (queryJob .getStatus ().getError ().getReason ());
266
+ throw ErrorUtils .getProgramFailureException (
267
+ new ErrorCategory (ErrorCategory .ErrorCategoryEnum .PLUGIN ), error , error , type , true ,
268
+ null );
218
269
}
219
270
220
- TableResult queryResults = queryJob .getQueryResults ();
271
+ TableResult queryResults ;
272
+ try {
273
+ queryResults = queryJob .getQueryResults ();
274
+ } catch (BigQueryException | InterruptedException e ) {
275
+ String errorMessage = String .format ("Failed to retrieve query result, %s: %s" ,
276
+ e .getClass ().getName (), e .getMessage ());
277
+ if (e instanceof BigQueryException ) {
278
+ throw BigQueryErrorUtil .getProgramFailureException (errorMessage ,
279
+ ((BigQueryException ) e ).getReason (), e );
280
+ }
281
+ throw ErrorUtils .getProgramFailureException (
282
+ new ErrorCategory (ErrorCategory .ErrorCategoryEnum .PLUGIN ), errorMessage , errorMessage ,
283
+ ErrorType .UNKNOWN , true , e );
284
+ }
221
285
long rows = queryResults .getTotalRows ();
222
286
223
287
if (config .shouldSetAsArguments ()) {
@@ -659,11 +723,12 @@ public void validateSQLSyntax(FailureCollector failureCollector, BigQuery bigQue
659
723
bigQuery .create (JobInfo .of (queryJobConfiguration ));
660
724
} catch (BigQueryException e ) {
661
725
final String errorMessage ;
662
- if (e .getCode () == ERROR_CODE_NOT_FOUND ) {
663
- errorMessage = String .format ("Resource was not found. Please verify the resource name. If the resource " +
664
- "will be created at runtime, then update to use a macro for the resource name. Error message received " +
665
- "was: %s" , e .getMessage ());
666
- } else {
726
+ if (e .getCode () == ERROR_CODE_NOT_FOUND ) {
727
+ errorMessage = String .format (
728
+ "Resource was not found. Please verify the resource name. If the resource will be "
729
+ + "created at runtime, then update to use a macro for the resource name. "
730
+ + "Error message received was %s, %s" , e .getClass ().getName (), e .getMessage ());
731
+ } else {
667
732
errorMessage = e .getMessage ();
668
733
}
669
734
failureCollector .addFailure (String .format ("%s. Error code: %s." , errorMessage , e .getCode ()),
0 commit comments