Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLUGIN-1818] Fix error messages in BQ Output Format #1516

Merged
merged 1 commit into from
Feb 11, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,30 +170,24 @@ public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException,
// Error if the output path already exists.
FileSystem outputFileSystem = outputPath.getFileSystem(conf);
if (outputFileSystem.exists(outputPath)) {
String errorMessage = String.format("The output path '%s' already exists.", outputPath);
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true,
new IOException(errorMessage));
throw new IOException("The output path '" + outputPath + "' already exists.");
}

// Error if compression is set as there's mixed support in BigQuery.
if (FileOutputFormat.getCompressOutput(job)) {
String errorMessage = "Compression isn't supported for this OutputFormat.";
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true,
new IOException(errorMessage));
String.format(errorMessageFormat, ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage),
ErrorType.SYSTEM, true, new IOException(errorMessage));
}

// Error if unable to create a BigQuery helper.
try {
new BigQueryFactoryWithScopes(GCPUtils.BIGQUERY_SCOPES).getBigQueryHelper(conf);
} catch (GeneralSecurityException gse) {
String errorMessage = "Failed to create BigQuery client";
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, String.format(errorMessageFormat,
ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true, gse);
throw new IOException(String.format("Failed to create BigQuery client %s: %s",
gse.getClass().getName(), gse.getMessage()), gse);
}

// Let delegate process its checks.
Expand Down Expand Up @@ -223,11 +217,9 @@ public static class BigQueryOutputCommitter extends ForwardingBigQueryFileOutput
try {
BigQueryFactory bigQueryFactory = new BigQueryFactoryWithScopes(GCPUtils.BIGQUERY_SCOPES);
this.bigQueryHelper = bigQueryFactory.getBigQueryHelper(context.getConfiguration());
} catch (GeneralSecurityException e) {
String errorMessage = "Failed to create BigQuery client";
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, e);
} catch (GeneralSecurityException gse) {
throw new IOException(String.format("Failed to create BigQuery client %s: %s",
gse.getClass().getName(), gse.getMessage()), gse);
}
}

Expand Down Expand Up @@ -285,10 +277,8 @@ public void commitJob(JobContext jobContext) throws IOException {
writeDisposition, sourceUris, partitionType, timePartitioningType, range, partitionByField,
requirePartitionFilter, clusteringOrderList, tableExists, jobLabelKeyValue, conf);
} catch (Exception e) {
String errorMessage = "Failed to import GCS into BigQuery.";
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, e);
throw new IOException(String.format("Failed to import GCS into BigQuery %s: %s.",
e.getClass().getName(), e.getMessage()), e);
}

cleanup(jobContext);
Expand Down Expand Up @@ -597,25 +587,25 @@ private static void waitForJobCompletion(BigQueryHelper bigQueryHelper, String p
numOfErrors = errors.size();
}
// Only add first error message in the exception. For other errors user should look at BigQuery job logs.
String errorMessageException = String.format("Error occurred while importing data to BigQuery '%s'." +
String errorMessageException = String.format(
"Error occurred while importing data to BigQuery '%s'." +
" There are total %s error(s) for BigQuery job %s. Please look at " +
"BigQuery job logs for more information.",
errorMessage, numOfErrors, jobReference.getJobId());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessageException,
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessageException), ErrorType.UNKNOWN, true,
new IOException(errorMessageException));

String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessageException),
ErrorType.UNKNOWN, true, null);
}
} else {
long millisToWait = pollBackOff.nextBackOffMillis();
if (millisToWait == BackOff.STOP) {
String errorMessage = String.format("Job %s failed to complete after %s millis.", jobReference.getJobId()
, elapsedTime);
String errorMessage = String.format("Job %s failed to complete after %s millis.",
jobReference.getJobId(), elapsedTime);
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.UNKNOWN, true,
new IOException(errorMessage));
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage),
ErrorType.UNKNOWN, true, null);
}
// Pause execution for the configured duration before polling job status again.
Thread.sleep(millisToWait);
Expand Down Expand Up @@ -655,7 +645,8 @@ private static Optional<TableSchema> getTableSchema(Configuration conf) throws I
BigQueryConfiguration.OUTPUT_TABLE_SCHEMA.getKey());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, e);
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage),
ErrorType.SYSTEM, true, e);
}
}
return Optional.empty();
Expand Down
Loading