Skip to content

Commit

Permalink
Merge branch 'data-integrations:develop' into GoogleBigQSource
Browse files Browse the repository at this point in the history
  • Loading branch information
Praveena2607 authored Jan 13, 2025
2 parents 8f29e31 + 60f3ba0 commit be1e5b6
Show file tree
Hide file tree
Showing 11 changed files with 126 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,14 +232,25 @@ public void verifyTheBigQueryValidationErrorMessageForInvalidProperty(String pro
expectedErrorMessage = PluginPropertyUtils
.errorProp(E2ETestConstants.ERROR_MSG_BQ_INCORRECT_CHUNKSIZE);
} else if (property.equalsIgnoreCase("bucket")) {
String propertyValue = PluginPropertyUtils.pluginProp("bqInvalidTemporaryBucket");
expectedErrorMessage = PluginPropertyUtils
.errorProp(E2ETestConstants.ERROR_MSG_BQ_INCORRECT_TEMPORARY_BUCKET);
.errorProp(E2ETestConstants.ERROR_MSG_BQ_INCORRECT_TEMPORARY_BUCKET)
.replace("VALUE", propertyValue);
} else if (property.equalsIgnoreCase("table")) {
String propertyValue = PluginPropertyUtils.pluginProp("bqInvalidSinkTable");
expectedErrorMessage = PluginPropertyUtils
.errorProp(E2ETestConstants.ERROR_MSG_INCORRECT_TABLE_NAME);
.errorProp(E2ETestConstants.ERROR_MSG_INCORRECT_TABLE_NAME)
.replace("VALUE", propertyValue);
} else if (property.equalsIgnoreCase("dataset")) {
String propertyValue = PluginPropertyUtils.pluginProp("bqInvalidSinkDataset");
expectedErrorMessage = PluginPropertyUtils
.errorProp(E2ETestConstants.ERROR_MSG_INCORRECT_DATASET_NAME)
.replace("VALUE", propertyValue);
} else {
String propertyValue = PluginPropertyUtils.pluginProp("bqInvalidPropertyValue");
expectedErrorMessage = PluginPropertyUtils.errorProp(E2ETestConstants.ERROR_MSG_BQ_INCORRECT_PROPERTY).
replaceAll("PROPERTY", property.substring(0, 1).toUpperCase() + property.substring(1));
replaceAll("PROPERTY", property.substring(0, 1).toUpperCase() + property.substring(1))
.replace("VALUE", propertyValue);
}
String actualErrorMessage = PluginPropertyUtils.findPropertyErrorElement(property).getText();
Assert.assertEquals(expectedErrorMessage, actualErrorMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* E2E Test constants.
*/
public class E2ETestConstants {
public static final String ERROR_MSG_INCORRECT_DATASET_NAME = "errorMessageIncorrectDatasetName";
public static final String ERROR_MSG_GCS_INVALID_PATH = "errorMessageGCSInvalidPath";
public static final String ERROR_MSG_GCS_INVALID_BUCKET_NAME = "errorMessageGCSInvalidBucketName";
public static final String ERROR_MSG_INCORRECT_TABLE = "errorMessageIncorrectBQTable";
Expand Down
7 changes: 4 additions & 3 deletions src/e2e-test/resources/errorMessage.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ errorMessagePubSubDelayThresholdField=Invalid delay threshold for publishing a b
errorMessagePubSubRetryTimeout=Invalid max retry timeout for retrying failed publish. Ensure the value is a positive number.
errorMessagePubSubErrorThreshold=Invalid error threshold for publishing. Ensure the value is a positive number.
errorMessageIncorrectBQChunkSize=Value must be a multiple of 262144.
errorMessageIncorrectBQBucketName=Bucket name can only contain lowercase letters, numbers, '.', '_', and '-'.
errorMessageIncorrectBQTableName=Table name can only contain letters (lower or uppercase), numbers, '_' and '-'.
errorMessageIncorrectBQProperty=PROPERTY name can only contain letters (lower or uppercase), numbers and '_'.
errorMessageIncorrectBQBucketName=Bucket name 'VALUE' can only contain lowercase letters, numbers, '.', '_', and '-'.
errorMessageIncorrectBQTableName=Table name 'VALUE' can only contain letters (lower or uppercase), numbers, '_' and '-'.
errorMessageIncorrectBQProperty=PROPERTY name 'VALUE' can only contain letters (lower or uppercase), numbers and '_'.
errorMessageIncorrectDatasetName=Dataset name 'VALUE' can only contain letters (lower or uppercase), numbers and '_'.
errorMessageInvalidPath=Error when trying to detect schema: Input path not found
errorMessageBQExecuteTableDataset=Dataset and table must be specified together.
errorMessageIncorrectMinimumSplitSize=Unable to create config for batchsource GCSFile 'minSplitSize' is invalid: Value of field class io.cdap.plugin.gcp.gcs.source.GCSSource$GCSSourceConfig.minSplitSize is expected to be a number.
Expand Down
1 change: 1 addition & 0 deletions src/e2e-test/resources/pluginParameters.properties
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ bqChunkSize=262144
bqInvalidChunkSize=26214
bqInvalidSinkDataset=$#%$
bqInvalidSinkTable=(*^*&*
bqInvalidPropertyValue=$#%^&
bqInvalidTemporaryBucket=$#%$
bqInvalidRefName=invalidRef&^*&&*
bqDatatypeChange1=[{"key":"Id","value":"long"},{"key":"Value","value":"long"}]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.api.exception.ErrorPhase;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.gcp.bigquery.common.BigQueryErrorDetailsProvider;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
Expand All @@ -43,6 +46,7 @@
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.gcs.GCSErrorDetailsProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -90,18 +94,32 @@ public final void prepareRun(BatchSinkContext context) throws Exception {
Credentials credentials = serviceAccount == null ?
null : GCPUtils.loadServiceAccountCredentials(serviceAccount, config.isServiceAccountFilePath());
String project = config.getProject();
bigQuery = GCPUtils.getBigQuery(project, credentials, null);
FailureCollector collector = context.getFailureCollector();
CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector);
collector.getOrThrowException();
baseConfiguration = getBaseConfiguration(cmekKeyName);

// Get required dataset ID and dataset instance (if it exists)
DatasetId datasetId = DatasetId.of(config.getDatasetProject(), config.getDataset());
Dataset dataset = bigQuery.getDataset(datasetId);
Dataset dataset;
try {
bigQuery = GCPUtils.getBigQuery(project, credentials, null);
dataset = bigQuery.getDataset(datasetId);
} catch (Exception e) {
ProgramFailureException ex = new BigQueryErrorDetailsProvider().getExceptionDetails(e,
new ErrorContext(ErrorPhase.WRITING));
throw ex == null ? e : ex;
}

// Get the required bucket name and bucket instance (if it exists)
Storage storage = GCPUtils.getStorage(project, credentials);
Storage storage;
try {
storage = GCPUtils.getStorage(project, credentials);;
} catch (Exception e) {
ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e,
new ErrorContext(ErrorPhase.WRITING));
throw ex == null ? e : ex;
}
String bucketName = BigQueryUtil.getStagingBucketName(context.getArguments().asMap(), config.getLocation(),
dataset, config.getBucket());
bucketName = BigQuerySinkUtils.configureBucket(baseConfiguration, bucketName, runUUID.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,10 @@ private static void createDataset(BigQuery bigQuery, DatasetId dataset, @Nullabl
ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(e.getCode());
String errorReason = String.format("%s %s %s For more details, see %s", e.getCode(),
e.getMessage(), pair.getCorrectiveAction(), GCPUtils.BQ_SUPPORTED_DOC_URL);
String errorMessageFinal = String.format("%s %s: %s", errorMessage.get(),
e.getClass().getName(), e.getMessage());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage.get(),
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessageFinal,
pair.getErrorType(), true, ErrorCodeType.HTTP, String.valueOf(e.getCode()),
GCPUtils.BQ_SUPPORTED_DOC_URL, e);
}
Expand Down Expand Up @@ -249,8 +251,10 @@ private static void createBucket(Storage storage, String bucket, @Nullable Strin
ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(e.getCode());
String errorReason = String.format("%s %s %s For more details, see %s", e.getCode(),
e.getMessage(), pair.getCorrectiveAction(), GCPUtils.GCS_SUPPORTED_DOC_URL);
String errorMessageFinal = String.format("%s %s: %s", errorMessage.get(),
e.getClass().getName(), e.getMessage());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage.get(),
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessageFinal,
pair.getErrorType(), true, ErrorCodeType.HTTP, String.valueOf(e.getCode()),
GCPUtils.GCS_SUPPORTED_DOC_URL, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
Expand All @@ -48,7 +49,9 @@
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.api.connector.Connector;
import io.cdap.cdap.etl.api.engine.sql.SQLEngineInput;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.api.exception.ErrorPhase;
import io.cdap.cdap.etl.api.validation.ValidationFailure;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.LineageRecorder;
Expand All @@ -60,6 +63,7 @@
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.gcs.GCSErrorDetailsProvider;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
Expand Down Expand Up @@ -147,9 +151,16 @@ public void prepareRun(BatchSourceContext context) throws Exception {
collector.getOrThrowException();
}

BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, null);
Dataset dataset = bigQuery.getDataset(DatasetId.of(config.getDatasetProject(), config.getDataset()));
Storage storage = GCPUtils.getStorage(config.getProject(), credentials);
BigQuery bigQuery;
Dataset dataset;
try {
bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, null);
dataset = bigQuery.getDataset(DatasetId.of(config.getDatasetProject(), config.getDataset()));
} catch (Exception e) {
ProgramFailureException ex = new BigQueryErrorDetailsProvider().getExceptionDetails(e,
new ErrorContext(ErrorPhase.READING));
throw ex == null ? e : ex;
}

// Get Configuration for this run
bucketPath = UUID.randomUUID().toString();
Expand All @@ -169,10 +180,18 @@ public void prepareRun(BatchSourceContext context) throws Exception {
dataset, config.getBucket());

// Configure GCS Bucket to use
Storage storage;
try {
storage = GCPUtils.getStorage(config.getProject(), credentials);;
} catch (Exception e) {
ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e,
new ErrorContext(ErrorPhase.READING));
throw ex == null ? e : ex;
}
String bucket = null;
try {
bucket = BigQuerySourceUtils.getOrCreateBucket(configuration, storage, bucketName, dataset, bucketPath,
cmekKeyName);
bucket = BigQuerySourceUtils.getOrCreateBucket(configuration, storage, bucketName, dataset,
bucketPath, cmekKeyName);
} catch (Exception e) {
String errorReason = "Failed to create bucket.";
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
* A custom ErrorDetailsProvider for GCP plugins.
*/
public class GCPErrorDetailsProvider implements ErrorDetailsProvider {
private static final String ERROR_MESSAGE_FORMAT = "Error occurred in the phase: '%s'. %s: %s";

/**
* Get a ProgramFailureException with the given error
Expand Down Expand Up @@ -71,12 +72,12 @@ public ProgramFailureException getExceptionDetails(Exception e, ErrorContext err
* @param e The HttpResponseException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(HttpResponseException e, ErrorContext errorContext) {
private ProgramFailureException getProgramFailureException(HttpResponseException e,
ErrorContext errorContext) {
Integer statusCode = e.getStatusCode();
ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode);
String errorReason = String.format("%s %s. %s", e.getStatusCode(), e.getStatusMessage(),
pair.getCorrectiveAction());
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";

String errorMessage = e.getMessage();
String externalDocumentationLink = null;
Expand All @@ -95,7 +96,8 @@ private ProgramFailureException getProgramFailureException(HttpResponseException
}

return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
errorReason, String.format(errorMessageFormat, errorContext.getPhase(), errorMessage),
errorReason, String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(),
e.getClass().getName(), errorMessage),
pair.getErrorType(), true, ErrorCodeType.HTTP, statusCode.toString(),
externalDocumentationLink, e);
}
Expand All @@ -122,11 +124,12 @@ private String getErrorMessage(GoogleJsonResponseException exception) {
* @param e The IllegalArgumentException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(IllegalArgumentException e, ErrorContext errorContext) {
private ProgramFailureException getProgramFailureException(IllegalArgumentException e,
ErrorContext errorContext) {
String errorMessage = e.getMessage();
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.USER, false, e);
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
errorMessage, String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(),
e.getClass().getName(), errorMessage), ErrorType.USER, false, e);
}

/**
Expand All @@ -136,11 +139,12 @@ private ProgramFailureException getProgramFailureException(IllegalArgumentExcept
* @param e The IllegalStateException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(IllegalStateException e, ErrorContext errorContext) {
private ProgramFailureException getProgramFailureException(IllegalStateException e,
ErrorContext errorContext) {
String errorMessage = e.getMessage();
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.SYSTEM, false, e);
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
errorMessage, String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(),
e.getClass().getName(), errorMessage), ErrorType.SYSTEM, false, e);
}

/**
Expand Down
18 changes: 14 additions & 4 deletions src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,17 @@
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageMetrics;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.connector.Connector;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.api.exception.ErrorPhase;
import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.ConfigUtil;
Expand Down Expand Up @@ -138,10 +141,14 @@ public void prepareRun(BatchSinkContext context) throws Exception {
}

String bucketName = config.getBucket(collector);
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
String correctiveAction = "Ensure you entered the correct bucket path and "
+ "have permissions for it.";
Storage storage;
try {
storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
} catch (Exception e) {
ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e,
new ErrorContext(ErrorPhase.READING));
throw ex == null ? e : ex;
}
Bucket bucket;
String location = null;
try {
Expand All @@ -153,6 +160,9 @@ public void prepareRun(BatchSinkContext context) throws Exception {
GCPUtils.createBucket(storage, bucketName, location, cmekKeyName);
}
} catch (StorageException e) {
String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
String correctiveAction = "Ensure you entered the correct bucket path and "
+ "have permissions for it.";
String errorReason = String.format(errorReasonFormat, e.getCode());
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction)
.withStacktrace(e.getStackTrace());
Expand Down
Loading

0 comments on commit be1e5b6

Please sign in to comment.