Skip to content

Commit

Permalink
Fix error messages in BQ Output Format
Browse files Browse the repository at this point in the history
  • Loading branch information
itsankit-google committed Feb 11, 2025
1 parent d451af6 commit 2295d5a
Showing 1 changed file with 20 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,30 +170,24 @@ public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException,
// Error if the output path already exists.
FileSystem outputFileSystem = outputPath.getFileSystem(conf);
if (outputFileSystem.exists(outputPath)) {
String errorMessage = String.format("The output path '%s' already exists.", outputPath);
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true,
new IOException(errorMessage));
throw new IOException("The output path '" + outputPath + "' already exists.");
}

// Error if compression is set as there's mixed support in BigQuery.
if (FileOutputFormat.getCompressOutput(job)) {
String errorMessage = "Compression isn't supported for this OutputFormat.";
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true,
new IOException(errorMessage));
String.format(errorMessageFormat, ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage),
ErrorType.SYSTEM, true, new IOException(errorMessage));
}

// Error if unable to create a BigQuery helper.
try {
new BigQueryFactoryWithScopes(GCPUtils.BIGQUERY_SCOPES).getBigQueryHelper(conf);
} catch (GeneralSecurityException gse) {
String errorMessage = "Failed to create BigQuery client";
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, String.format(errorMessageFormat,
ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true, gse);
throw new IOException(String.format("Failed to create BigQuery client %s: %s",
gse.getClass().getName(), gse.getMessage()), gse);
}

// Let delegate process its checks.
Expand Down Expand Up @@ -223,11 +217,9 @@ public static class BigQueryOutputCommitter extends ForwardingBigQueryFileOutput
try {
BigQueryFactory bigQueryFactory = new BigQueryFactoryWithScopes(GCPUtils.BIGQUERY_SCOPES);
this.bigQueryHelper = bigQueryFactory.getBigQueryHelper(context.getConfiguration());
} catch (GeneralSecurityException e) {
String errorMessage = "Failed to create BigQuery client";
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, e);
} catch (GeneralSecurityException gse) {
throw new IOException(String.format("Failed to create BigQuery client %s: %s",
gse.getClass().getName(), gse.getMessage()), gse);
}
}

Expand Down Expand Up @@ -285,10 +277,8 @@ public void commitJob(JobContext jobContext) throws IOException {
writeDisposition, sourceUris, partitionType, timePartitioningType, range, partitionByField,
requirePartitionFilter, clusteringOrderList, tableExists, jobLabelKeyValue, conf);
} catch (Exception e) {
String errorMessage = "Failed to import GCS into BigQuery.";
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, e);
throw new IOException(String.format("Failed to import GCS into BigQuery %s: %s.",
e.getClass().getName(), e.getMessage()), e);
}

cleanup(jobContext);
Expand Down Expand Up @@ -597,25 +587,25 @@ private static void waitForJobCompletion(BigQueryHelper bigQueryHelper, String p
numOfErrors = errors.size();
}
// Only add first error message in the exception. For other errors user should look at BigQuery job logs.
String errorMessageException = String.format("Error occurred while importing data to BigQuery '%s'." +
String errorMessageException = String.format(
"Error occurred while importing data to BigQuery '%s'." +
" There are total %s error(s) for BigQuery job %s. Please look at " +
"BigQuery job logs for more information.",
errorMessage, numOfErrors, jobReference.getJobId());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessageException,
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessageException), ErrorType.UNKNOWN, true,
new IOException(errorMessageException));

String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessageException),
ErrorType.UNKNOWN, true, null);
}
} else {
long millisToWait = pollBackOff.nextBackOffMillis();
if (millisToWait == BackOff.STOP) {
String errorMessage = String.format("Job %s failed to complete after %s millis.", jobReference.getJobId()
, elapsedTime);
String errorMessage = String.format("Job %s failed to complete after %s millis.",
jobReference.getJobId(), elapsedTime);
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.UNKNOWN, true,
new IOException(errorMessage));
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage),
ErrorType.UNKNOWN, true, null);
}
// Pause execution for the configured duration before polling job status again.
Thread.sleep(millisToWait);
Expand Down Expand Up @@ -655,7 +645,8 @@ private static Optional<TableSchema> getTableSchema(Configuration conf) throws I
BigQueryConfiguration.OUTPUT_TABLE_SCHEMA.getKey());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, e);
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage),
ErrorType.SYSTEM, true, e);
}
}
return Optional.empty();
Expand Down

0 comments on commit 2295d5a

Please sign in to comment.