diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java index 6d958aad7b..b9fc110a68 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java @@ -57,22 +57,26 @@ import com.google.cloud.hadoop.io.bigquery.output.ForwardingBigQueryFileOutputCommitter; import com.google.cloud.hadoop.io.bigquery.output.ForwardingBigQueryFileOutputFormat; import com.google.cloud.hadoop.util.ConfigurationUtil; -import com.google.cloud.hadoop.util.HadoopConfigurationProperty; import com.google.cloud.hadoop.util.ResilientOperation; import com.google.cloud.hadoop.util.RetryDeterminer; import com.google.common.base.Strings; import com.google.common.collect.Lists; import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.plugin.gcp.bigquery.source.BigQueryFactoryWithScopes; import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants; import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil; import io.cdap.plugin.gcp.common.GCPUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileAlreadyExistsException; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -137,6 +141,44 @@ public OutputCommitter createCommitter(TaskAttemptContext context) throws IOExce return new BigQueryOutputCommitter(context, delegateCommitter); } + /** + * This method is copied from + * {@link ForwardingBigQueryFileOutputFormat#checkOutputSpecs(JobContext)} to override + * {@link BigQueryFactory} with {@link BigQueryFactoryWithScopes}. + */ + @Override + public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException, IOException { + Configuration conf = job.getConfiguration(); + + // Validate the output configuration. + BigQueryOutputConfiguration.validateConfiguration(conf); + + // Get the output path. + Path outputPath = BigQueryOutputConfiguration.getGcsOutputPath(conf); + LOG.info("Using output path '%s'.", outputPath); + + // Error if the output path already exists. + FileSystem outputFileSystem = outputPath.getFileSystem(conf); + if (outputFileSystem.exists(outputPath)) { + throw new IOException("The output path '" + outputPath + "' already exists."); + } + + // Error if compression is set as there's mixed support in BigQuery. + if (FileOutputFormat.getCompressOutput(job)) { + throw new IOException("Compression isn't supported for this OutputFormat."); + } + + // Error if unable to create a BigQuery helper. + try { + new BigQueryFactoryWithScopes(GCPUtils.BIGQUERY_SCOPES).getBigQueryHelper(conf); + } catch (GeneralSecurityException gse) { + throw new IOException("Failed to create BigQuery client", gse); + } + + // Let delegate process its checks. + getDelegate(conf).checkOutputSpecs(job); + } + /** * BigQuery Output committer. */ @@ -158,7 +200,7 @@ public static class BigQueryOutputCommitter extends ForwardingBigQueryFileOutput BigQueryOutputCommitter(TaskAttemptContext context, OutputCommitter delegate) throws IOException { super(context, delegate); try { - BigQueryFactory bigQueryFactory = new BigQueryFactory(); + BigQueryFactory bigQueryFactory = new BigQueryFactoryWithScopes(GCPUtils.BIGQUERY_SCOPES); this.bigQueryHelper = bigQueryFactory.getBigQueryHelper(context.getConfiguration()); } catch (GeneralSecurityException e) { throw new IOException("Failed to create Bigquery client.", e); diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQueryFactoryWithScopes.java b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQueryFactoryWithScopes.java index 386012fb2d..9e2af569ba 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQueryFactoryWithScopes.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQueryFactoryWithScopes.java @@ -19,14 +19,15 @@ import com.google.api.client.auth.oauth2.Credential; import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration; import com.google.cloud.hadoop.io.bigquery.BigQueryFactory; +import com.google.cloud.hadoop.util.AccessTokenProvider; import com.google.cloud.hadoop.util.CredentialFromAccessTokenProviderClassFactory; import com.google.cloud.hadoop.util.HadoopCredentialConfiguration; import com.google.common.collect.ImmutableList; +import io.cdap.plugin.gcp.gcs.ServiceAccountAccessTokenProvider; import org.apache.hadoop.conf.Configuration; import java.io.IOException; import java.security.GeneralSecurityException; -import java.util.Collections; import java.util.List; /** @@ -43,9 +44,7 @@ public BigQueryFactoryWithScopes(List scopes) { @Override public Credential createBigQueryCredential(Configuration config) throws GeneralSecurityException, IOException { Credential credential = - CredentialFromAccessTokenProviderClassFactory.credential( - config, - Collections.singletonList(BigQueryConfiguration.BIGQUERY_CONFIG_PREFIX), + CredentialFromAccessTokenProviderClassFactory.credential(getAccessTokenProvider(config), scopes); if (credential != null) { return credential; @@ -53,6 +52,19 @@ public Credential createBigQueryCredential(Configuration config) throws GeneralS return HadoopCredentialConfiguration.getCredentialFactory( config, String.valueOf(ImmutableList.of(BigQueryConfiguration.BIGQUERY_CONFIG_PREFIX))) - .getCredential(BIGQUERY_OAUTH_SCOPES); + .getCredential(scopes); + } + + /** + * returns the {@link AccessTokenProvider} that uses the newer GoogleCredentials + * library to get the credentials. + * + * @param config Hadoop {@link Configuration} + * @return {@link ServiceAccountAccessTokenProvider} + */ + private AccessTokenProvider getAccessTokenProvider(Configuration config) { + AccessTokenProvider accessTokenProvider = new ServiceAccountAccessTokenProvider(); + accessTokenProvider.setConf(config); + return accessTokenProvider; } } diff --git a/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java b/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java index f4f2bc6af7..f31264004f 100644 --- a/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java +++ b/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java @@ -172,12 +172,13 @@ private static Map generateAuthProperties(@Nullable String servi // AccessTokenProviderClassFromConfigFactory will by default look for // google.cloud.auth.access.token.provider.impl // but can be configured to also look for the conf with other prefixes like - // gs.fs.auth.access.token.provider.impl + // fs.gs.auth.access.token.provider.impl // mapred.bq.auth.access.token.provider.impl // for use by GCS and BQ. for (String prefix : prefixes) { - properties.put(prefix + HadoopCredentialConfiguration.ACCESS_TOKEN_PROVIDER_IMPL_SUFFIX, - ServiceAccountAccessTokenProvider.class.getName()); + properties.put( + prefix + HadoopCredentialConfiguration.ACCESS_TOKEN_PROVIDER_IMPL_SUFFIX.getKey(), + ServiceAccountAccessTokenProvider.class.getName()); } return properties; } diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java b/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java index 78f7c128ca..35b96d57a5 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java @@ -56,6 +56,12 @@ public void refresh() throws IOException { private GoogleCredentials getCredentials() throws IOException { if (credentials == null) { + if (conf == null) { + // {@link CredentialFromAccessTokenProviderClassFactory#credential} does not propagate the + // config to {@link ServiceAccountAccessTokenProvider} which causes NPE when + // initializing {@link ForwardingBigQueryFileOutputCommitter because conf is null. + conf = new Configuration(); + } credentials = GCPUtils.loadCredentialsFromConf(conf); } return credentials; diff --git a/src/test/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProviderTest.java b/src/test/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProviderTest.java new file mode 100644 index 0000000000..7a54a3bc2e --- /dev/null +++ b/src/test/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProviderTest.java @@ -0,0 +1,74 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.gcp.gcs; + +import com.google.api.client.auth.oauth2.Credential; +import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration; +import com.google.cloud.hadoop.util.AccessTokenProvider; +import com.google.cloud.hadoop.util.CredentialFactory; +import com.google.cloud.hadoop.util.CredentialFromAccessTokenProviderClassFactory; +import com.google.cloud.hadoop.util.HadoopCredentialConfiguration; +import com.google.common.collect.ImmutableList; +import io.cdap.plugin.gcp.common.GCPUtils; +import org.apache.hadoop.conf.Configuration; +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Test; +import java.io.IOException; +import java.util.Map; + +/** + * Unit Tests for {@link ServiceAccountAccessTokenProvider}. + */ +public class ServiceAccountAccessTokenProviderTest { + + @Test + public void testServiceAccountAccessTokenProviderIsUsed() throws IOException { + + Map authProperties = GCPUtils.generateGCSAuthProperties(null, + "filePath"); + Configuration conf = new Configuration(); + for (Map.Entry prop : authProperties.entrySet()) { + conf.set(prop.getKey(), prop.getValue()); + } + + AccessTokenProvider accessTokenProvider = + HadoopCredentialConfiguration.getAccessTokenProvider(conf, ImmutableList.of( + GoogleHadoopFileSystemConfiguration.GCS_CONFIG_PREFIX)); + + Assert.assertTrue(String.format("AccessTokenProvider should be an instance of %s", + ServiceAccountAccessTokenProvider.class.getName()), + accessTokenProvider instanceof ServiceAccountAccessTokenProvider); + } + + @Test + public void testServiceAccountAccessTokenProvider() throws IOException { + Map authProperties = GCPUtils.generateGCSAuthProperties(null, + "filePath"); + Configuration conf = new Configuration(); + for (Map.Entry prop : authProperties.entrySet()) { + conf.set(prop.getKey(), prop.getValue()); + } + // {@link CredentialFromAccessTokenProviderClassFactory#credential} does not propagate the + // config to {@link ServiceAccountAccessTokenProvider} which should not cause NPE + Credential credential = CredentialFromAccessTokenProviderClassFactory.credential( + conf, ImmutableList.of(GoogleHadoopFileSystemConfiguration.GCS_CONFIG_PREFIX), + CredentialFactory.DEFAULT_SCOPES + ); + Assert.assertNotNull(credential); + } +}