diff --git a/build.sbt b/build.sbt index 8f990bb2e8..cbcda63ef9 100644 --- a/build.sbt +++ b/build.sbt @@ -837,6 +837,10 @@ lazy val `scio-core` = project "org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % beamVersion % Provided, "org.apache.beam" % s"beam-runners-spark-$sparkMajorVersion" % beamVersion % Provided, "org.apache.beam" % "beam-sdks-java-extensions-google-cloud-platform-core" % beamVersion % Provided, + "org.apache.hadoop" % "hadoop-common" % hadoopVersion % Provided, + "com.google.cloud.bigdataoss" % "gcs-connector" % s"hadoop2-$bigdataossVersion" % Provided, + "com.google.cloud.bigdataoss" % "gcsio" % bigdataossVersion % Provided, + "com.google.cloud.bigdataoss" % "util-hadoop" % s"hadoop2-$bigdataossVersion" % Provided, // test "com.lihaoyi" %% "fansi" % fansiVersion % Test, "com.lihaoyi" %% "pprint" % pprintVersion % Test, diff --git a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala index ce3e169b6a..e429491a74 100644 --- a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala +++ b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala @@ -48,6 +48,7 @@ import scala.collection.mutable.{Buffer => MBuffer} import scala.concurrent.duration.Duration import scala.io.Source import scala.reflect.ClassTag +import scala.util.chaining._ import scala.util.control.NoStackTrace import scala.util.{Failure, Success, Try} import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions @@ -454,6 +455,87 @@ class ScioContext private[scio] ( o.setScioVersion(BuildInfo.version) } + { + import org.apache.hadoop.conf.Configuration + import com.google.cloud.hadoop.fs.gcs.{GoogleHadoopFileSystemConfiguration => GfsConfig} + import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions + + try { + // If Hadoop is on the classpath, try to parse default gcs-connector options + val config = new Configuration() + val o = optionsAs[GcsOptions] + + o.setGoogleCloudStorageReadOptions( + GoogleCloudStorageReadOptions + .builder() + .pipe(o => + Option(config.get(GfsConfig.GCS_INPUT_STREAM_FAST_FAIL_ON_NOT_FOUND_ENABLE.getKey)) + .map(_.toBoolean) + .fold(o)(o.setFastFailOnNotFound) + ) + .pipe(o => + Option(config.get(GfsConfig.GCS_INPUT_STREAM_SUPPORT_GZIP_ENCODING_ENABLE.getKey)) + .map(_.toBoolean) + .fold(o)(o.setSupportGzipEncoding) + ) + .pipe(o => + Option(config.get(GfsConfig.GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT.getKey)) + .map(_.toLong) + .fold(o)(o.setInplaceSeekLimit) + ) + .pipe(o => + Option(config.get(GfsConfig.GCS_INPUT_STREAM_FADVISE.getKey)) + .map(GoogleCloudStorageReadOptions.Fadvise.valueOf) + .fold(o)(o.setFadvise) + ) + .pipe(o => + Option(config.get(GfsConfig.GCS_INPUT_STREAM_MIN_RANGE_REQUEST_SIZE.getKey)) + .map(_.toInt) + .fold(o)(o.setMinRangeRequestSize) + ) + .pipe(o => + Option(config.get(GfsConfig.GCS_GRPC_CHECKSUMS_ENABLE.getKey)) + .map(_.toBoolean) + .fold(o)(o.setGrpcChecksumsEnabled) + ) + .pipe(o => + Option(config.get(GfsConfig.GCS_GRPC_READ_TIMEOUT_MS.getKey)) + .map(_.toLong) + .fold(o)(o.setGrpcReadTimeoutMillis) + ) + .pipe(o => + Option(config.get(GfsConfig.GCS_GRPC_READ_MESSAGE_TIMEOUT_MS.getKey)) + .map(_.toLong) + .fold(o)(o.setGrpcReadMessageTimeoutMillis) + ) + .pipe(o => + Option(config.get(GfsConfig.GCS_GRPC_READ_METADATA_TIMEOUT_MS.getKey)) + .map(_.toLong) + .fold(o)(o.setGrpcReadMetadataTimeoutMillis) + ) + .pipe(o => + Option(config.get(GfsConfig.GCS_GRPC_READ_ZEROCOPY_ENABLE.getKey)) + .map(_.toBoolean) + .fold(o)(o.setGrpcReadZeroCopyEnabled) + ) + .pipe(o => + Option(config.get(GfsConfig.GCS_TRACE_LOG_ENABLE.getKey)) + .map(_.toBoolean) + .fold(o)(o.setTraceLogEnabled) + ) + .pipe(o => + Option(config.get(GfsConfig.GCS_TRACE_LOG_TIME_THRESHOLD_MS.getKey)) + .map(_.toLong) + .fold(o)(o.setTraceLogTimeThreshold) + ) + .build() + ) + } catch { + // Hadoop and/or gcs-connector is excluded from classpath, do not try to set options + case _: NoClassDefFoundError | _: NoSuchMethodException => + } + } + private[scio] def labels: Map[String, String] = (for { // Check if class is in classpath